我们团队面临一个棘手的技术挑战:开发一款支持多人实时协作的移动应用。核心功能类似一个共享的白板,用户的每一个操作(绘制、移动、删除元素)都需要近乎实时地同步给所有在线的协作者。同时,应用必须支持离线操作,在网络恢复后能自动同步状态,且不能产生数据冲突。
在第一个 Sprint 的技术预研阶段,我们迅速排除了传统的基于 CRUD API 和轮询的方案。这种方法在处理高并发写入和状态一致性时会变得异常复杂,极易出现竞态条件,并且对客户端和服务器的资源消耗巨大。我们需要一个更稳健、更具扩展性的模型。
最终,我们决定采用事件溯源(Event Sourcing, ES)作为核心架构模式。在这种模式下,我们不存储应用当前的状态,而是将导致状态变更的每一个“事件”作为事实(source of truth)持久化下来。应用的状态可以通过重放这些事件流来随时重建。这个模型天然地解决了我们的核心痛点:
- 审计与追溯: 拥有完整的事件历史,可以轻松回溯到任意时间点的状态。
- 离线同步: 客户端在离线时可以将操作暂存为事件队列,在线后一次性提交。
- 状态重建与多视图: 可以基于同一事件流构建出多个不同的“读取模型”(Read Model),以满足不同的查询需求(CQRS 模式)。
技术栈的选择同样关键。我们需要一个能够处理高并发事件流、容错性强且运维成本低的后端,以及一个高效的跨平台移动端框架。我们的最终决策是:
- 后端语言: Elixir。基于 BEAM 虚拟机,其轻量级进程和强大的并发模型(Actor Model)是处理海量独立事件的绝佳工具。其“let it crash”的哲学也为我们构建高容错系统提供了信心。
- 运行环境: AWS Lambda。事件驱动的本质与 Serverless 完美契合。每个事件都可以触发一个独立的 Lambda 执行,实现了极致的弹性伸缩和成本效益。
- 事件存储: Amazon DynamoDB。其单毫秒级延迟、强大的扩展性以及与 Lambda 的原生集成(DynamoDB Streams)使其成为事件存储的理想选择。
- 移动端: React Native。跨平台能力可以显著加速我们在 Scrum 迭代中的交付速度。
整个项目在 Scrum 框架下推进。我们将架构的搭建和验证分解到多个 Sprint 中,每个 Sprint 都有明确的可交付增量,例如 “实现事件的原子性写入” 或 “构建第一个读取模型投影”。
架构核心:事件的写入与存储
系统的入口是一个 API Gateway,它接收来自 React Native 客户端的“命令”(Command)。这些命令代表用户的意图,例如 CreateElement
或 MoveElement
。API Gateway 将命令转发给我们的第一个核心 Lambda 函数:Command-Handler
。
这个函数负责验证命令、将其转换为一个或多个不可变的“事件”(Event),然后原子性地将事件写入 DynamoDB。在真实项目中,原子性写入至关重要,我们使用 DynamoDB 的 TransactWriteItems
操作来保证同一聚合(Aggregate,此处指白板上的单个元素)的多个事件要么全部成功,要么全部失败。
Command-Handler
的 Elixir 实现,使用了 AWS Elixir SDK。为了在 Lambda 中运行 Elixir,我们利用了社区提供的 Custom Runtime。
# lib/command_handler.ex
defmodule App.CommandHandler do
use Lambda.Handler
require Logger
# 假设我们已经配置好了 AWS SDK 和 JSON 编解码库
alias Aws.DynamoDB
@table_name System.get_env("EVENTS_TABLE_NAME")
@impl Lambda.Handler
def handle(event, _context) do
# 在生产环境中,这里的解析和验证会复杂得多
# 需要处理不同类型的命令,并进行权限校验
with {:ok, command} <- Jason.decode(event["body"]),
{:ok, validated_command} <- validate_command(command),
{:ok, events} <- command_to_events(validated_command) do
persist_events(events)
else
{:error, reason} ->
Logger.error("Failed to process command: #{inspect(reason)}")
# 返回标准的 API Gateway 错误响应
%{statusCode: 400, body: Jason.encode!(%{error: reason})}
end
end
defp validate_command(command) do
# 真实的验证逻辑:检查字段、权限、业务规则等
# 这里为了简化,我们只检查 command_id 是否存在
case Map.get(command, "command_id") do
nil -> {:error, "Missing command_id"}
_ -> {:ok, command}
end
end
defp command_to_events(%{"type" => "CreateElement", "payload" => payload} = command) do
# 将命令转换为事件。事件应该包含所有必要的信息来重建状态。
event = %{
event_id: UUID.uuid4(),
aggregate_id: payload["element_id"], # 聚合根 ID,即元素 ID
type: "ElementCreated",
version: 1, # 对于新创建的聚合,版本号总是 1
payload: payload,
timestamp: DateTime.utc_now() |> DateTime.to_iso8601(),
causation_id: command["command_id"] # 记录导致此事件的命令ID,用于追踪
}
{:ok, [event]}
end
# ... 其他命令到事件的转换逻辑 ...
defp persist_events(events) do
# 使用 TransactWriteItems 保证原子性
# 这是事件溯源实现中最关键的部分之一
requests = Enum.map(events, fn event ->
%{
put: %{
table_name: @table_name,
item: Jason.encode!(event),
# 关键:乐观锁。确保我们是在最新的聚合版本上追加事件
# 如果 aggregate_id 不存在(即新聚合),则写入成功
# 如果已存在,其 version 必须是 event.version - 1
condition_expression: "attribute_not_exists(aggregate_id) OR version = :expected_version",
expression_attribute_values: %{
":expected_version" => %{n: to_string(event.version - 1)}
}
}
}
end)
case DynamoDB.transact_write_items(requests) do
{:ok, _response} ->
Logger.info("Successfully persisted #{length(events)} events.")
# 成功响应
%{statusCode: 202, body: Jason.encode!(%{message: "Accepted"})}
{:error, %{__type: "TransactionCanceledException"} = reason} ->
# 这通常意味着版本冲突,需要客户端重试
Logger.warning("Transaction canceled, likely a concurrency conflict: #{inspect(reason)}")
%{statusCode: 409, body: Jason.encode!(%{error: "Conflict, please retry."})}
{:error, reason} ->
# 其他持久化错误,需要告警
Logger.error("Failed to persist events: #{inspect(reason)}")
%{statusCode: 500, body: Jason.encode!(%{error: "Internal server error"})}
end
end
end
这里的坑在于 condition_expression
。这是实现乐观并发控制的核心。它确保了只有当事件的版本号能够正确地接续在现有事件流之后时,写入才会成功。如果两个用户同时操作同一个元素,只有一个会成功,另一个会收到 409 Conflict
错误,客户端需要基于最新的状态重试其操作。
状态投影:构建读取模型
事件被写入 DynamoDB 后,我们如何获得可供客户端查询的当前状态?直接从事件流中实时计算是不可行的,尤其当事件数量增长时。这里,我们利用 DynamoDB Streams 和第二个 Lambda 函数 Event-Projector
来解决。
当 events
表有新的写入时,DynamoDB Streams 会捕获这些变更,并异步触发 Event-Projector
Lambda。这个函数唯一的职责就是读取新事件,更新一个或多个专门用于查询的“读取模型”(Read Model)。
我们将读取模型也存储在 DynamoDB 的一个单独的表中,例如 whiteboard_state
。这张表的结构被优化为查询友好型,例如以白板 ID 为分区键,可以直接获取整个白板的当前所有元素状态。
graph TD subgraph "客户端 (React Native)" A[User Action] --> B{Command}; end B --> C[API Gateway]; C --> D[Lambda: Command-Handler]; D -- 1. Validate & Convert --> E{Event}; E -- 2. Persist with Optimistic Locking --> F[(DynamoDB: Event Store)]; subgraph "异步投影流程" F -- 3. DynamoDB Stream --> G[Lambda: Event-Projector]; G -- 4. Apply Event --> H[(DynamoDB: Read Model)]; end subgraph "实时通知" G -- 5. Publish Update --> I[AWS IoT Core]; end I -- 6. Push Update via WebSocket --> J[React Native Client]; subgraph "查询流程" K[React Native Client] -- Query Current State --> L[API Gateway]; L --> M[Lambda: Query-Handler]; M -- Reads from --> H; end
Event-Projector
的 Elixir 代码片段:
# lib/event_projector.ex
defmodule App.EventProjector do
use Lambda.Handler
require Logger
alias Aws.DynamoDB
alias Aws.IoTDataPlane
@state_table_name System.get_env("STATE_TABLE_NAME")
@iot_topic_prefix "whiteboard/"
@impl Lambda.Handler
def handle(event, _context) do
# DynamoDB Stream 的事件体结构比较复杂
# 我们需要从 event["Records"] 中提取出新的事件数据
Enum.each(event["Records"], fn record ->
# 只处理 INSERT 事件
if record["eventName"] == "INSERT" do
# new_image 是一个 DynamoDB AttributeValue Map, 需要 unmarshall
new_image = record["dynamodb"]["NewImage"]
# 假设有一个工具函数来转换 DynamoDB 格式为 Elixir Map
event_data = unmarshall(new_image)
process_event(event_data)
end
end)
{:ok, "Projection successful"}
end
defp process_event(%{"type" => "ElementCreated", "payload" => payload} = event) do
element_id = event["aggregate_id"]
whiteboard_id = payload["whiteboard_id"]
# 读取模型:我们直接将元素状态写入 state 表
# Key: PK=WHITEBOARD#{whiteboard_id}, SK=ELEMENT#{element_id}
item = %{
"PK" => whiteboard_id,
"SK" => element_id,
"type" => "element",
"x" => payload["x"],
"y" => payload["y"],
"shape" => payload["shape"],
"last_updated" => event["timestamp"]
}
update_read_model_and_notify(whiteboard_id, item)
end
defp process_event(%{"type" => "ElementMoved", "payload" => payload} = event) do
# 对于更新操作,我们需要先读取当前状态
# 但更高效的方式是直接更新特定字段
element_id = event["aggregate_id"]
whiteboard_id = payload["whiteboard_id"] # 事件中需要冗余一些关联信息
update_expression = "SET x = :x, y = :y, last_updated = :ts"
expression_attribute_values = %{
":x" => payload["new_x"],
":y" => payload["new_y"],
":ts" => event["timestamp"]
}
# 同样可以更新读取模型并通知
# ... 实现省略 ...
end
# ... 其他事件处理逻辑 ...
defp update_read_model_and_notify(whiteboard_id, item) do
case DynamoDB.put_item(@state_table_name, item) do
{:ok, _} ->
Logger.info("Read model updated for whiteboard: #{whiteboard_id}")
# 投影成功后,通过 AWS IoT Core (MQTT over WebSocket) 推送实时更新
notify_clients(whiteboard_id, item)
{:error, reason} ->
# 关键:错误处理。如果投影失败,DynamoDB Stream 会自动重试。
# 多次失败后,事件会进入我们配置的 DLQ (Dead Letter Queue),
# 以便进行手动干预,避免数据丢失。
Logger.error("Failed to update read model: #{inspect(reason)}")
# 重新抛出异常,触发 Lambda 的重试机制
raise "Projection failed for whiteboard: #{whiteboard_id}"
end
end
defp notify_clients(whiteboard_id, payload) do
topic = @iot_topic_prefix <> whiteboard_id
case IoTDataPlane.publish(topic, Jason.encode!(payload)) do
{:ok, _} -> :ok
{:error, reason} ->
# 通知失败是次要问题,不应影响主流程。记录日志即可。
Logger.warning("Failed to notify clients on topic #{topic}: #{inspect(reason)}")
end
end
end
这个投影过程是整个架构的另一个核心。它是异步且最终一致的。这意味着从写入事件到读取模型更新之间存在短暂的延迟。对于我们的协作应用,这种毫秒级的延迟是完全可以接受的。
React Native 客户端的实现思路
客户端的责任同样重大。
命令发送与重试: 用户操作被封装成命令,通过 HTTPS 请求发送给 API Gateway。我们实现了一个命令队列,当网络不可用时,命令被暂存在本地(如使用
AsyncStorage
)。网络恢复后,队列中的命令被依次发送。每个命令都有一个唯一的 ID,以支持后端的幂等性处理。状态管理与实时更新: 客户端使用状态管理库(如 Redux 或 Zustand)来维护本地的白板状态。它通过 WebSocket 连接到 AWS IoT Core,订阅对应白板 ID 的 Topic。当
Event-Projector
推送更新时,客户端接收到新的元素状态,并直接更新本地 store,触发 UI 的重新渲染。乐观更新: 为了提升用户体验,当用户执行一个操作时,我们可以立即在本地 UI 上应用这个变更(乐观更新),同时将命令发送到后端。如果后端返回成功,一切照旧。如果返回冲突(409),我们需要回滚本地的乐观更新,并使用从 IoT Core 推送过来的最新状态来覆盖本地状态。
以下是一个简化的 React Native 端服务,用于处理与后端的交互:
// services/WhiteboardService.js
import { API } from 'aws-amplify';
import { PubSub } from 'aws-amplify';
import AsyncStorage from '@react-native-async-storage/async-storage';
import { v4 as uuidv4 } from 'uuid';
const COMMAND_QUEUE_KEY = 'command_queue';
class WhiteboardService {
constructor(whiteboardId, onStateUpdate) {
this.whiteboardId = whiteboardId;
this.apiName = 'WhiteboardAPI'; // 在 Amplify 中配置的 API 名称
this.subscription = null;
this.onStateUpdate = onStateUpdate;
}
// 订阅实时更新
subscribeToUpdates() {
const topic = `whiteboard/${this.whiteboardId}`;
this.subscription = PubSub.subscribe(topic).subscribe({
next: data => {
// 收到来自后端的投影更新
console.log('Received real-time update:', data.value);
this.onStateUpdate(data.value);
},
error: error => console.error(error),
complete: () => console.log('Subscription done'),
});
}
unsubscribe() {
if (this.subscription) {
this.subscription.unsubscribe();
}
}
// 发送命令,支持离线队列
async sendCommand(type, payload) {
const command = {
command_id: uuidv4(),
whiteboard_id: this.whiteboardId,
type,
payload,
timestamp: new Date().toISOString(),
};
try {
// 这里的 API.post 会调用 API Gateway -> Command-Handler Lambda
const response = await API.post(this.apiName, '/commands', {
body: command,
});
console.log('Command accepted:', response);
this.processQueuedCommands(); // 发送成功后,尝试处理队列中的其他命令
return { success: true };
} catch (error) {
console.error('Failed to send command:', error);
// 网络错误或其他临时性问题,将命令加入队列
if (error.response?.status !== 409 && error.response?.status !== 400) {
await this.queueCommand(command);
}
return { success: false, error };
}
}
async queueCommand(command) {
const queue = JSON.parse(await AsyncStorage.getItem(COMMAND_QUEUE_KEY) || '[]');
queue.push(command);
await AsyncStorage.setItem(COMMAND_QUEUE_KEY, JSON.stringify(queue));
}
// 在应用启动或网络恢复时调用
async processQueuedCommands() {
let queue = JSON.parse(await AsyncStorage.getItem(COMMAND_QUEUE_KEY) || '[]');
if (queue.length === 0) return;
const commandToSend = queue.shift();
await AsyncStorage.setItem(COMMAND_QUEUE_KEY, JSON.stringify(queue));
// 重新发送
await this.sendCommand(commandToSend.type, commandToSend.payload);
}
}
export default WhiteboardService;
局限性与未来迭代方向
这套架构并非银弹。在 Sprint 的回顾会议中,我们也识别出了一些潜在的复杂性和需要持续关注的点。
首先,Elixir on Lambda 的冷启动问题是真实存在的。对于需要极低延迟的交互,首次调用的性能可能不理想。我们通过配置 Lambda 的 Provisioned Concurrency 在一定程度上缓解了这个问题,但这带来了额外的成本。一个潜在的优化是,对于写操作,或许可以考虑使用常驻的 Fargate 服务而非 Lambda,只将异步的、非关键路径的投影工作放在 Lambda 中。
其次,事件模型的设计和演进是一个长期挑战。一旦事件被写入,它们就是不可变的。如果业务逻辑变化导致需要新的事件结构,我们就必须处理版本兼容问题。通常采用的策略是版本化事件,并在投影逻辑中处理不同版本的事件,但这无疑增加了代码的复杂性。
最后,对于具有极长生命周期的聚合(例如一个使用了数年的白板),从头重放所有事件来重建状态会变得非常缓慢。虽然我们的读取模型避免了查询时的重放,但在调试、迁移或重建新的读取模型时,这个问题会暴露出来。未来的迭代计划中,我们已经加入了一个技术故事:实现“快照”(Snapshotting)机制。即定期将聚合的当前状态保存为一个快照,在重建时,只需从最新的快照开始,重放后续的少量事件即可。