技术痛点:失控的看板业务逻辑
我们的看板(Kanban)系统后端服务开始变得臃肿。最初,一个核心的业务规则——“WIP(Work-In-Progress)限制”,即限制某一列中卡片的数量——是直接实现在卡片移动的 API 端点里的。逻辑很简单:在移动卡片之前,先用一个 SQL 查询目标列的卡片总数,如果 count >= WIP_LIMIT
,就拒绝操作。
// 早期在后端服务中的实现方式 (伪代码)
func MoveCardHandler(w http.ResponseWriter, r *http.Request) {
// ... 解析请求,获取 cardID 和 targetColumnID ...
// 1. 开启事务
tx, err := db.BeginTx(ctx, nil)
// ... 错误处理 ...
// 2. 检查WIP限制
var currentWIP int
err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM cards WHERE column_id = ?", targetColumnID).Scan(¤tWIP)
// ... 错误处理 ...
var wipLimit int
err = tx.QueryRowContext(ctx, "SELECT wip_limit FROM columns WHERE id = ?", targetColumnID).Scan(&wipLimit)
// ... 错误处理 ...
if currentWIP >= wipLimit {
http.Error(w, "WIP limit exceeded", http.StatusForbidden)
tx.Rollback()
return
}
// 3. 执行卡片移动
_, err = tx.ExecContext(ctx, "UPDATE cards SET column_id = ? WHERE id = ?", targetColumnID, cardID)
// ... 错误处理 ...
// 4. 提交事务
tx.Commit()
w.WriteHeader(http.StatusOK)
}
这个模式在系统初期运行良好。但随着业务发展,新的规则不断涌现:特定类型的卡片不受 WIP 限制、VIP 用户可以临时超出限制、某些列的 WIP 限制依赖于其他列的状态等等。这些校验逻辑与核心的数据库操作(UPDATE cards
)纠缠在一起,让 MoveCardHandler
变得难以测试和维护。更糟糕的是,其他多个 API 端点(如“创建卡片”、“批量移动”)也需要重复实现这套复杂的校验逻辑。这违反了 DRY 原则,并成了滋生 bug 的温床。
初步构想:将校验逻辑前置到 API 网关
问题的核心在于,业务规则校验(Authorization & Validation)和核心业务执行(Execution)被耦合在了同一个地方。理想的架构应该将它们分离。既然所有请求都通过我们的 API 网关 Tyk,那么是否可以在 Tyk 层面,也就是请求到达后端服务之前,就完成 WIP 限制的校验?
这个构想的优势显而易见:
- 逻辑集中化: 所有 WIP 相关的校验逻辑都集中在一个地方,易于维护和迭代。
- 服务解耦: 后端服务可以专注于核心的 CRUD 操作,变得更加纯粹。
- 性能优势: 对于不满足校验的请求,可以直接在网关层拒绝,避免了对后端服务的无效请求,节省了计算资源。
Tyk 提供了多种中间件机制,包括 Python/JS 插件、gRPC 插件和原生 Go 插件。考虑到 WIP 校验是一个同步且阻塞的操作(必须校验通过后才能放行请求),并且对性能要求极高,因为它位于请求的关键路径上。原生 Go 插件无疑是最佳选择,它能提供近乎本地代码的执行效率。
技术选型与实现步骤
我们的目标是创建一个 Tyk Go 插件,它会在处理特定 API 请求时触发,执行以下操作:
- 解析传入的请求体,获取
cardId
和targetColumnId
。 - 连接到一个 SQL 数据库(例如 PostgreSQL)。
- 查询数据库,获取目标列的当前卡片数量和 WIP 限制。
- 执行校验逻辑。
- 如果校验失败,插件将直接返回一个
403 Forbidden
响应。 - 如果校验通过,插件将请求放行至上游的后端服务。
步骤一:搭建基础 Go 插件项目结构
首先,我们需要一个 Go 项目,并引入 Tyk 网关的依赖。
go.mod
:
module tyk-kanban-wip-validator
go 1.20
require github.com/TykTechnologies/tyk v5.3.0
插件的入口点是一个名为 WipCheck
的函数。Tyk 会在 API 定义中配置好这个中间件后,在请求处理流程的特定阶段调用它。
plugin.go
:
package main
import (
"context"
"database/sql"
"encoding/json"
"io"
"log"
"net/http"
"github.com/TykTechnologies/tyk/ctx"
"github.com/TykTechnologies/tyk/gateway"
"github.com/TykTechnologies/tyk/user"
)
// 全局数据库连接池,这是关键。
// 在真实项目中,不能在每次请求处理时都创建新连接。
var db *sql.DB
// MoveCardRequest 定义了插件需要从请求体中解析的结构
type MoveCardRequest struct {
TargetColumnID int `json:"targetColumnId"`
}
// init 函数在插件加载时执行,是初始化数据库连接池的理想位置。
func init() {
log.Println("Initialising Tyk Kanban WIP Validator plugin")
// 这里的数据库连接字符串应该从环境变量或配置文件中读取,
// 为演示方便在此硬编码。
connStr := "postgres://user:password@localhost:5432/kanban_db?sslmode=disable"
var err error
db, err = sql.Open("postgres", connStr)
if err != nil {
log.Fatalf("FATAL: Failed to open DB connection: %v", err)
}
// 配置连接池参数
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(25)
db.SetConnMaxLifetime(5 * time.Minute)
// 尝试ping数据库以验证连接
if err = db.Ping(); err != nil {
log.Fatalf("FATAL: Failed to ping DB: %v", err)
}
log.Println("DB connection pool established.")
}
// WipCheck 是我们的中间件处理函数
func WipCheck(rw http.ResponseWriter, r *http.Request) {
// 基础实现,后续会填充逻辑
log.Println("WIP Check middleware triggered")
}
func main() {} // Go 插件需要一个 main 函数,但通常是空的
为了让 Tyk 加载这个插件,我们需要一个 manifest.json
文件:
manifest.json
:
{
"file_list": [
"plugin.go",
"go.mod"
],
"custom_middleware": {
"driver": "goplugin",
"pre": [
{
"name": "WipCheck",
"path": "plugin.so",
"require_session": false
}
]
}
}
将 Go 项目编译成共享对象文件 (.so
),并配置到 Tyk API 定义中,即可完成第一步集成。
步骤二:实现核心 SQL 校验逻辑
现在来填充 WipCheck
函数的细节。这里的坑在于:插件运行时,请求体 r.Body
是一个 io.ReadCloser
,只能被读取一次。如果我们的插件读取了它,那么下游的后端服务就读不到了。正确的做法是读取后,用一个新的 io.ReadCloser
替换掉原来的 r.Body
。
// WipCheck 更新后的实现
func WipCheck(rw http.ResponseWriter, r *http.Request) {
session := ctx.GetSession(r)
if session == nil {
// 理论上不应该发生,但做好防御性编程
log.Println("WARN: Could not get session from context")
return
}
// 1. 读取并缓存请求体
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("ERROR: Failed to read request body: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
r.Body.Close() // 及时关闭
// 2. 将字节流重新包装成 io.ReadCloser,以便下游服务可以再次读取
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
// 3. 解析请求体
var reqPayload MoveCardRequest
if err := json.Unmarshal(bodyBytes, &reqPayload); err != nil {
log.Printf("WARN: Failed to unmarshal request body: %v. Passing through.", err)
// 如果请求体格式不符,可能不是我们关心的API,直接放行
return
}
// 如果 targetColumnId 为0或负数,说明解析失败或请求无效
if reqPayload.TargetColumnID <= 0 {
return
}
log.Printf("INFO: Validating WIP for target column: %d", reqPayload.TargetColumnID)
// 4. 执行数据库查询
// 使用带超时的 context 是生产级代码的标配
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
var currentWIP, wipLimit int
// 在一个事务中执行两个查询,保证数据的一致性
tx, err := db.BeginTx(ctx, nil)
if err != nil {
log.Printf("ERROR: Failed to begin transaction: %v", err)
// 数据库连接问题,选择 fail-open 策略,记录错误后放行,避免阻塞业务
// 在某些高安全场景下,可能会选择 fail-close 策略
return
}
defer tx.Rollback() // 保证事务在函数退出时回滚(如果未提交)
// 查询当前WIP
err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM cards WHERE column_id = $1", reqPayload.TargetColumnID).Scan(¤tWIP)
if err != nil {
if err == sql.ErrNoRows {
currentWIP = 0
} else {
log.Printf("ERROR: Failed to query current WIP: %v", err)
return // Fail-open
}
}
// 查询WIP Limit
err = tx.QueryRowContext(ctx, "SELECT wip_limit FROM columns WHERE id = $1", reqPayload.TargetColumnID).Scan(&wipLimit)
if err != nil {
log.Printf("ERROR: Failed to query WIP limit: %v", err)
return // Fail-open
}
// 5. 执行校验逻辑
// wipLimit <= 0 表示无限制
if wipLimit > 0 && currentWIP >= wipLimit {
log.Printf("INFO: WIP limit exceeded for column %d. Current: %d, Limit: %d. Rejecting request.", reqPayload.TargetColumnID, currentWIP, wipLimit)
// 关键:重写响应并告知 Tyk 结束请求处理
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusForbidden)
// 构造一个有意义的错误响应
errorResponse := map[string]string{"error": "WIP limit exceeded"}
jsonResponse, _ := json.Marshal(errorResponse)
rw.Write(jsonResponse)
// 使用 Tyk 特定的上下文对象来设置一个标志,告诉网关这个请求已经被处理,不要再转发到上游
session.SetMiddlewareError("WIP limit exceeded")
return
}
log.Printf("INFO: WIP check passed for column %d", reqPayload.TargetColumnID)
// 如果校验通过,函数正常返回,请求会继续流转到下一个中间件或上游服务
}
这个版本的实现已经可以工作,但在真实项目中,它存在一个致命的性能瓶颈:每一次 API 调用都会触发至少两次 SQL 查询。对于高频操作的看板系统,这会给数据库带来巨大压力。
步骤三:引入缓存策略,优化性能
为了解决性能问题,我们可以在插件中引入一个内存缓存。每次查询 WIP 限制时,先查缓存,缓存未命中再去查数据库,并将结果存入缓存。
我们将使用一个简单的带 TTL 的内存缓存库,例如 patrickmn/go-cache
。
go.mod
中增加依赖:
require (
github.com/TykTechnologies/tyk v5.3.0
github.com/patrickmn/go-cache v2.1.0+incompatible
)
plugin.go
中进行改造:
// ... import 部分增加 "github.com/patrickmn/go-cache" ...
// 全局变量区
var db *sql.DB
var wipCache *cache.Cache
// init 函数中增加缓存初始化
func init() {
// ... 数据库初始化代码 ...
log.Println("Initialising in-memory cache")
// 创建一个缓存,默认过期时间5分钟,每10分钟清理一次过期项目
wipCache = cache.New(5*time.Minute, 10*time.Minute)
}
// getWipLimitFromDbOrCache 是新的数据获取函数
func getWipLimitFromDbOrCache(ctx context.Context, columnID int) (int, error) {
cacheKey := fmt.Sprintf("wip_limit_%d", columnID)
// 1. 尝试从缓存获取
if limit, found := wipCache.Get(cacheKey); found {
log.Printf("DEBUG: Cache HIT for key: %s", cacheKey)
return limit.(int), nil
}
log.Printf("DEBUG: Cache MISS for key: %s. Fetching from DB.", cacheKey)
// 2. 缓存未命中,查询数据库
var wipLimit int
err := db.QueryRowContext(ctx, "SELECT wip_limit FROM columns WHERE id = $1", columnID).Scan(&wipLimit)
if err != nil {
return 0, err
}
// 3. 将结果存入缓存
wipCache.Set(cacheKey, wipLimit, cache.DefaultExpiration)
return wipLimit, nil
}
// WipCheck 函数中的查询部分需要被重构
func WipCheck(rw http.ResponseWriter, r *http.Request) {
// ... 请求体解析部分保持不变 ...
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
// 注意:currentWIP 是实时变化的,不能被缓存。
// 我们只缓存相对静态的 wipLimit。
var currentWIP int
err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM cards WHERE column_id = $1", reqPayload.TargetColumnID).Scan(¤tWIP)
if err != nil {
log.Printf("ERROR: Failed to query current WIP: %v", err)
return // Fail-open
}
wipLimit, err := getWipLimitFromDbOrCache(ctx, reqPayload.TargetColumnID)
if err != nil {
log.Printf("ERROR: Failed to get WIP limit: %v", err)
return // Fail-open
}
// ... 校验逻辑保持不变 ...
}
通过引入缓存,我们将对 columns
表的读操作压力显著降低。这是一种常见的优化策略,但它也引入了新的问题:数据一致性。如果管理员在后台修改了某列的 WIP 限制,缓存中的数据在过期之前都是旧的。解决这个问题通常有两种方案:
- 接受最终一致性,缩短缓存 TTL。对于 WIP 限制这种变更不频繁的数据,5分钟的延迟通常是可以接受的。
- 实现主动缓存失效。当
columns
表发生变更时,通过某种机制(如数据库触发器+消息队列)通知所有 Tyk 网关节点清除对应的缓存。这种方案更复杂,但一致性更高。
对于我们的场景,方案1是更务实的选择。
最终成果:一个可配置、高性能的业务校验插件
为了让插件更具通用性,硬编码的配置(如数据库连接串、缓存 TTL)都应该外部化。Tyk 允许在 API 定义中为自定义中间件提供 config_data
。
Tyk API Definition (JSON)
:
{
"custom_middleware": {
"pre": [
{
"name": "WipCheck",
"path": "plugin.so"
}
],
"driver": "goplugin",
"config_data": {
"connection_string": "postgres://user:password@host:port/db?sslmode=disable",
"cache_ttl_seconds": 300,
"cache_cleanup_interval_seconds": 600
}
}
}
我们的 init
函数需要被修改以读取这些配置。Tyk Go 插件的配置在插件加载时通过一个特殊的方式注入,通常需要定义一个 LoadConfig
函数。
最终,请求处理的完整流程可以用下面的时序图来表示:
sequenceDiagram participant Client participant Tyk Gateway participant Go Plugin (WipCheck) participant Cache (In-Memory) participant SQL Database participant Upstream API Client->>+Tyk Gateway: POST /moveCard (targetColumnId: 3) Tyk Gateway->>+Go Plugin (WipCheck): Invoke middleware Go Plugin (WipCheck)->>Go Plugin (WipCheck): Parse request body Go Plugin (WipCheck)->>+SQL Database: SELECT COUNT(*) FROM cards... SQL Database-->>-Go Plugin (WipCheck): Returns currentWIP = 4 Go Plugin (WipCheck)->>+Cache (In-Memory): GET wip_limit_3 Cache (In-Memory)-->>-Go Plugin (WipCheck): MISS Go Plugin (WipCheck)->>+SQL Database: SELECT wip_limit FROM columns... SQL Database-->>-Go Plugin (WipCheck): Returns wipLimit = 5 Go Plugin (WipCheck)->>+Cache (In-Memory): SET wip_limit_3 = 5 (TTL 5m) Cache (In-Memory)-->>-Go Plugin (WipCheck): OK Go Plugin (WipCheck)->>Go Plugin (WipCheck): Validate (4 < 5 -> true) Go Plugin (WipCheck)-->>-Tyk Gateway: Pass request Tyk Gateway->>+Upstream API: Forward original request Upstream API-->>-Tyk Gateway: 200 OK Tyk Gateway-->>-Client: 200 OK %% --- Scenario: WIP Limit Exceeded --- Client->>+Tyk Gateway: POST /moveCard (targetColumnId: 3) Tyk Gateway->>+Go Plugin (WipCheck): Invoke middleware Go Plugin (WipCheck)->>+SQL Database: SELECT COUNT(*) FROM cards... SQL Database-->>-Go Plugin (WipCheck): Returns currentWIP = 5 Go Plugin (WipCheck)->>+Cache (In-Memory): GET wip_limit_3 Cache (In-Memory)-->>-Go Plugin (WipCheck): HIT, returns 5 Go Plugin (WipCheck)->>Go Plugin (WipCheck): Validate (5 >= 5 -> true) Go Plugin (WipCheck)->>Tyk Gateway: Set MiddlewareError & Write 403 response Tyk Gateway-->>-Client: 403 Forbidden ({"error":"WIP limit exceeded"}) Note right of Tyk Gateway: Request does not reach Upstream API
局限性与未来迭代路径
当前方案将复杂的业务校验逻辑成功地从后端服务中剥离出来,集中在 API 网关层面进行处理,并通过缓存机制保证了高性能。然而,它并非银弹,存在一些局限性:
缓存一致性与分布式问题: 我们使用的是节点本地的内存缓存。在 Tyk 网关集群部署时,每个节点都有一份独立的缓存。这不仅会造成数据冗余,更严重的是,当一个节点的缓存因为 TTL 过期而去数据库更新了数据后,其他节点的缓存依然是旧的。一个可行的优化路径是切换到分布式缓存,如 Redis,以保证所有网关节点共享同一份缓存视图。
插件与业务的耦合: 虽然解耦了后端服务,但插件本身现在与看板的业务规则(特别是数据库 schema)紧密耦合。如果
cards
或columns
表的结构发生变化,插件代码也需要同步修改、编译和部署。这增加了维护的复杂度。对于更复杂的场景,可以考虑让插件调用一个专门的、轻量级的“规则服务”,而不是直接连接数据库。Fail-open 策略的风险: 当前的错误处理(如数据库查询失败)采用的是 fail-open 策略,即放行请求。这保证了系统的可用性,但在某些对规则执行有严格要求的场景下,可能会导致数据不一致(例如,暂时超出了 WIP 限制)。在未来的迭代中,可以引入基于配置的策略切换,允许针对不同 API 或环境选择 fail-open 或 fail-close。