利用 Tyk Go 插件实现基于 SQL 的同步业务规则校验与数据处理


技术痛点:失控的看板业务逻辑

我们的看板(Kanban)系统后端服务开始变得臃肿。最初,一个核心的业务规则——“WIP(Work-In-Progress)限制”,即限制某一列中卡片的数量——是直接实现在卡片移动的 API 端点里的。逻辑很简单:在移动卡片之前,先用一个 SQL 查询目标列的卡片总数,如果 count >= WIP_LIMIT,就拒绝操作。

// 早期在后端服务中的实现方式 (伪代码)
func MoveCardHandler(w http.ResponseWriter, r *http.Request) {
    // ... 解析请求,获取 cardID 和 targetColumnID ...

    // 1. 开启事务
    tx, err := db.BeginTx(ctx, nil)
    // ... 错误处理 ...

    // 2. 检查WIP限制
    var currentWIP int
    err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM cards WHERE column_id = ?", targetColumnID).Scan(&currentWIP)
    // ... 错误处理 ...

    var wipLimit int
    err = tx.QueryRowContext(ctx, "SELECT wip_limit FROM columns WHERE id = ?", targetColumnID).Scan(&wipLimit)
    // ... 错误处理 ...

    if currentWIP >= wipLimit {
        http.Error(w, "WIP limit exceeded", http.StatusForbidden)
        tx.Rollback()
        return
    }

    // 3. 执行卡片移动
    _, err = tx.ExecContext(ctx, "UPDATE cards SET column_id = ? WHERE id = ?", targetColumnID, cardID)
    // ... 错误处理 ...

    // 4. 提交事务
    tx.Commit()
    w.WriteHeader(http.StatusOK)
}

这个模式在系统初期运行良好。但随着业务发展,新的规则不断涌现:特定类型的卡片不受 WIP 限制、VIP 用户可以临时超出限制、某些列的 WIP 限制依赖于其他列的状态等等。这些校验逻辑与核心的数据库操作(UPDATE cards)纠缠在一起,让 MoveCardHandler 变得难以测试和维护。更糟糕的是,其他多个 API 端点(如“创建卡片”、“批量移动”)也需要重复实现这套复杂的校验逻辑。这违反了 DRY 原则,并成了滋生 bug 的温床。

初步构想:将校验逻辑前置到 API 网关

问题的核心在于,业务规则校验(Authorization & Validation)和核心业务执行(Execution)被耦合在了同一个地方。理想的架构应该将它们分离。既然所有请求都通过我们的 API 网关 Tyk,那么是否可以在 Tyk 层面,也就是请求到达后端服务之前,就完成 WIP 限制的校验?

这个构想的优势显而易见:

  1. 逻辑集中化: 所有 WIP 相关的校验逻辑都集中在一个地方,易于维护和迭代。
  2. 服务解耦: 后端服务可以专注于核心的 CRUD 操作,变得更加纯粹。
  3. 性能优势: 对于不满足校验的请求,可以直接在网关层拒绝,避免了对后端服务的无效请求,节省了计算资源。

Tyk 提供了多种中间件机制,包括 Python/JS 插件、gRPC 插件和原生 Go 插件。考虑到 WIP 校验是一个同步且阻塞的操作(必须校验通过后才能放行请求),并且对性能要求极高,因为它位于请求的关键路径上。原生 Go 插件无疑是最佳选择,它能提供近乎本地代码的执行效率。

技术选型与实现步骤

我们的目标是创建一个 Tyk Go 插件,它会在处理特定 API 请求时触发,执行以下操作:

  1. 解析传入的请求体,获取 cardIdtargetColumnId
  2. 连接到一个 SQL 数据库(例如 PostgreSQL)。
  3. 查询数据库,获取目标列的当前卡片数量和 WIP 限制。
  4. 执行校验逻辑。
  5. 如果校验失败,插件将直接返回一个 403 Forbidden 响应。
  6. 如果校验通过,插件将请求放行至上游的后端服务。

步骤一:搭建基础 Go 插件项目结构

首先,我们需要一个 Go 项目,并引入 Tyk 网关的依赖。

go.mod:

module tyk-kanban-wip-validator

go 1.20

require github.com/TykTechnologies/tyk v5.3.0

插件的入口点是一个名为 WipCheck 的函数。Tyk 会在 API 定义中配置好这个中间件后,在请求处理流程的特定阶段调用它。

plugin.go:

package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"io"
	"log"
	"net/http"

	"github.com/TykTechnologies/tyk/ctx"
	"github.com/TykTechnologies/tyk/gateway"
	"github.com/TykTechnologies/tyk/user"
)

// 全局数据库连接池,这是关键。
// 在真实项目中,不能在每次请求处理时都创建新连接。
var db *sql.DB

// MoveCardRequest 定义了插件需要从请求体中解析的结构
type MoveCardRequest struct {
	TargetColumnID int `json:"targetColumnId"`
}

// init 函数在插件加载时执行,是初始化数据库连接池的理想位置。
func init() {
	log.Println("Initialising Tyk Kanban WIP Validator plugin")

	// 这里的数据库连接字符串应该从环境变量或配置文件中读取,
	// 为演示方便在此硬编码。
	connStr := "postgres://user:password@localhost:5432/kanban_db?sslmode=disable"
	var err error
	db, err = sql.Open("postgres", connStr)
	if err != nil {
		log.Fatalf("FATAL: Failed to open DB connection: %v", err)
	}

	// 配置连接池参数
	db.SetMaxOpenConns(25)
	db.SetMaxIdleConns(25)
	db.SetConnMaxLifetime(5 * time.Minute)

	// 尝试ping数据库以验证连接
	if err = db.Ping(); err != nil {
		log.Fatalf("FATAL: Failed to ping DB: %v", err)
	}

	log.Println("DB connection pool established.")
}

// WipCheck 是我们的中间件处理函数
func WipCheck(rw http.ResponseWriter, r *http.Request) {
    // 基础实现,后续会填充逻辑
    log.Println("WIP Check middleware triggered")
}

func main() {} // Go 插件需要一个 main 函数,但通常是空的

为了让 Tyk 加载这个插件,我们需要一个 manifest.json 文件:

manifest.json:

{
    "file_list": [
        "plugin.go",
        "go.mod"
    ],
    "custom_middleware": {
        "driver": "goplugin",
        "pre": [
            {
                "name": "WipCheck",
                "path": "plugin.so",
                "require_session": false
            }
        ]
    }
}

将 Go 项目编译成共享对象文件 (.so),并配置到 Tyk API 定义中,即可完成第一步集成。

步骤二:实现核心 SQL 校验逻辑

现在来填充 WipCheck 函数的细节。这里的坑在于:插件运行时,请求体 r.Body 是一个 io.ReadCloser,只能被读取一次。如果我们的插件读取了它,那么下游的后端服务就读不到了。正确的做法是读取后,用一个新的 io.ReadCloser 替换掉原来的 r.Body

// WipCheck 更新后的实现
func WipCheck(rw http.ResponseWriter, r *http.Request) {
	session := ctx.GetSession(r)
	if session == nil {
		// 理论上不应该发生,但做好防御性编程
		log.Println("WARN: Could not get session from context")
		return
	}

	// 1. 读取并缓存请求体
	bodyBytes, err := io.ReadAll(r.Body)
	if err != nil {
		log.Printf("ERROR: Failed to read request body: %v", err)
		rw.WriteHeader(http.StatusInternalServerError)
		return
	}
	r.Body.Close() // 及时关闭

	// 2. 将字节流重新包装成 io.ReadCloser,以便下游服务可以再次读取
	r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))

	// 3. 解析请求体
	var reqPayload MoveCardRequest
	if err := json.Unmarshal(bodyBytes, &reqPayload); err != nil {
		log.Printf("WARN: Failed to unmarshal request body: %v. Passing through.", err)
		// 如果请求体格式不符,可能不是我们关心的API,直接放行
		return
	}
	
	// 如果 targetColumnId 为0或负数,说明解析失败或请求无效
	if reqPayload.TargetColumnID <= 0 {
		return
	}

	log.Printf("INFO: Validating WIP for target column: %d", reqPayload.TargetColumnID)
	
	// 4. 执行数据库查询
	// 使用带超时的 context 是生产级代码的标配
	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	var currentWIP, wipLimit int

	// 在一个事务中执行两个查询,保证数据的一致性
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		log.Printf("ERROR: Failed to begin transaction: %v", err)
		// 数据库连接问题,选择 fail-open 策略,记录错误后放行,避免阻塞业务
		// 在某些高安全场景下,可能会选择 fail-close 策略
		return
	}
	defer tx.Rollback() // 保证事务在函数退出时回滚(如果未提交)

	// 查询当前WIP
	err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM cards WHERE column_id = $1", reqPayload.TargetColumnID).Scan(&currentWIP)
	if err != nil {
		if err == sql.ErrNoRows {
			currentWIP = 0
		} else {
			log.Printf("ERROR: Failed to query current WIP: %v", err)
			return // Fail-open
		}
	}

	// 查询WIP Limit
	err = tx.QueryRowContext(ctx, "SELECT wip_limit FROM columns WHERE id = $1", reqPayload.TargetColumnID).Scan(&wipLimit)
	if err != nil {
		log.Printf("ERROR: Failed to query WIP limit: %v", err)
		return // Fail-open
	}

	// 5. 执行校验逻辑
	// wipLimit <= 0 表示无限制
	if wipLimit > 0 && currentWIP >= wipLimit {
		log.Printf("INFO: WIP limit exceeded for column %d. Current: %d, Limit: %d. Rejecting request.", reqPayload.TargetColumnID, currentWIP, wipLimit)

		// 关键:重写响应并告知 Tyk 结束请求处理
		rw.Header().Set("Content-Type", "application/json")
		rw.WriteHeader(http.StatusForbidden)
		
		// 构造一个有意义的错误响应
		errorResponse := map[string]string{"error": "WIP limit exceeded"}
		jsonResponse, _ := json.Marshal(errorResponse)
		rw.Write(jsonResponse)

		// 使用 Tyk 特定的上下文对象来设置一个标志,告诉网关这个请求已经被处理,不要再转发到上游
		session.SetMiddlewareError("WIP limit exceeded")
		return
	}

	log.Printf("INFO: WIP check passed for column %d", reqPayload.TargetColumnID)
	// 如果校验通过,函数正常返回,请求会继续流转到下一个中间件或上游服务
}

这个版本的实现已经可以工作,但在真实项目中,它存在一个致命的性能瓶颈:每一次 API 调用都会触发至少两次 SQL 查询。对于高频操作的看板系统,这会给数据库带来巨大压力。

步骤三:引入缓存策略,优化性能

为了解决性能问题,我们可以在插件中引入一个内存缓存。每次查询 WIP 限制时,先查缓存,缓存未命中再去查数据库,并将结果存入缓存。

我们将使用一个简单的带 TTL 的内存缓存库,例如 patrickmn/go-cache

go.mod 中增加依赖:

require (
	github.com/TykTechnologies/tyk v5.3.0
	github.com/patrickmn/go-cache v2.1.0+incompatible
)

plugin.go 中进行改造:

// ... import 部分增加 "github.com/patrickmn/go-cache" ...

// 全局变量区
var db *sql.DB
var wipCache *cache.Cache

// init 函数中增加缓存初始化
func init() {
	// ... 数据库初始化代码 ...

	log.Println("Initialising in-memory cache")
	// 创建一个缓存,默认过期时间5分钟,每10分钟清理一次过期项目
	wipCache = cache.New(5*time.Minute, 10*time.Minute)
}

// getWipLimitFromDbOrCache 是新的数据获取函数
func getWipLimitFromDbOrCache(ctx context.Context, columnID int) (int, error) {
	cacheKey := fmt.Sprintf("wip_limit_%d", columnID)

	// 1. 尝试从缓存获取
	if limit, found := wipCache.Get(cacheKey); found {
		log.Printf("DEBUG: Cache HIT for key: %s", cacheKey)
		return limit.(int), nil
	}

	log.Printf("DEBUG: Cache MISS for key: %s. Fetching from DB.", cacheKey)
	
	// 2. 缓存未命中,查询数据库
	var wipLimit int
	err := db.QueryRowContext(ctx, "SELECT wip_limit FROM columns WHERE id = $1", columnID).Scan(&wipLimit)
	if err != nil {
		return 0, err
	}

	// 3. 将结果存入缓存
	wipCache.Set(cacheKey, wipLimit, cache.DefaultExpiration)
	
	return wipLimit, nil
}

// WipCheck 函数中的查询部分需要被重构
func WipCheck(rw http.ResponseWriter, r *http.Request) {
	// ... 请求体解析部分保持不变 ...

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	// 注意:currentWIP 是实时变化的,不能被缓存。
	// 我们只缓存相对静态的 wipLimit。
	var currentWIP int
	err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM cards WHERE column_id = $1", reqPayload.TargetColumnID).Scan(&currentWIP)
	if err != nil {
		log.Printf("ERROR: Failed to query current WIP: %v", err)
		return // Fail-open
	}

	wipLimit, err := getWipLimitFromDbOrCache(ctx, reqPayload.TargetColumnID)
	if err != nil {
		log.Printf("ERROR: Failed to get WIP limit: %v", err)
		return // Fail-open
	}
	
	// ... 校验逻辑保持不变 ...
}

通过引入缓存,我们将对 columns 表的读操作压力显著降低。这是一种常见的优化策略,但它也引入了新的问题:数据一致性。如果管理员在后台修改了某列的 WIP 限制,缓存中的数据在过期之前都是旧的。解决这个问题通常有两种方案:

  1. 接受最终一致性,缩短缓存 TTL。对于 WIP 限制这种变更不频繁的数据,5分钟的延迟通常是可以接受的。
  2. 实现主动缓存失效。当 columns 表发生变更时,通过某种机制(如数据库触发器+消息队列)通知所有 Tyk 网关节点清除对应的缓存。这种方案更复杂,但一致性更高。

对于我们的场景,方案1是更务实的选择。

最终成果:一个可配置、高性能的业务校验插件

为了让插件更具通用性,硬编码的配置(如数据库连接串、缓存 TTL)都应该外部化。Tyk 允许在 API 定义中为自定义中间件提供 config_data

Tyk API Definition (JSON):

{
  "custom_middleware": {
    "pre": [
      {
        "name": "WipCheck",
        "path": "plugin.so"
      }
    ],
    "driver": "goplugin",
    "config_data": {
        "connection_string": "postgres://user:password@host:port/db?sslmode=disable",
        "cache_ttl_seconds": 300,
        "cache_cleanup_interval_seconds": 600
    }
  }
}

我们的 init 函数需要被修改以读取这些配置。Tyk Go 插件的配置在插件加载时通过一个特殊的方式注入,通常需要定义一个 LoadConfig 函数。

最终,请求处理的完整流程可以用下面的时序图来表示:

sequenceDiagram
    participant Client
    participant Tyk Gateway
    participant Go Plugin (WipCheck)
    participant Cache (In-Memory)
    participant SQL Database
    participant Upstream API

    Client->>+Tyk Gateway: POST /moveCard (targetColumnId: 3)
    Tyk Gateway->>+Go Plugin (WipCheck): Invoke middleware
    Go Plugin (WipCheck)->>Go Plugin (WipCheck): Parse request body
    Go Plugin (WipCheck)->>+SQL Database: SELECT COUNT(*) FROM cards...
    SQL Database-->>-Go Plugin (WipCheck): Returns currentWIP = 4
    Go Plugin (WipCheck)->>+Cache (In-Memory): GET wip_limit_3
    Cache (In-Memory)-->>-Go Plugin (WipCheck): MISS
    Go Plugin (WipCheck)->>+SQL Database: SELECT wip_limit FROM columns...
    SQL Database-->>-Go Plugin (WipCheck): Returns wipLimit = 5
    Go Plugin (WipCheck)->>+Cache (In-Memory): SET wip_limit_3 = 5 (TTL 5m)
    Cache (In-Memory)-->>-Go Plugin (WipCheck): OK
    Go Plugin (WipCheck)->>Go Plugin (WipCheck): Validate (4 < 5 -> true)
    Go Plugin (WipCheck)-->>-Tyk Gateway: Pass request
    Tyk Gateway->>+Upstream API: Forward original request
    Upstream API-->>-Tyk Gateway: 200 OK
    Tyk Gateway-->>-Client: 200 OK

    %% --- Scenario: WIP Limit Exceeded ---
    Client->>+Tyk Gateway: POST /moveCard (targetColumnId: 3)
    Tyk Gateway->>+Go Plugin (WipCheck): Invoke middleware
    Go Plugin (WipCheck)->>+SQL Database: SELECT COUNT(*) FROM cards...
    SQL Database-->>-Go Plugin (WipCheck): Returns currentWIP = 5
    Go Plugin (WipCheck)->>+Cache (In-Memory): GET wip_limit_3
    Cache (In-Memory)-->>-Go Plugin (WipCheck): HIT, returns 5
    Go Plugin (WipCheck)->>Go Plugin (WipCheck): Validate (5 >= 5 -> true)
    Go Plugin (WipCheck)->>Tyk Gateway: Set MiddlewareError & Write 403 response
    Tyk Gateway-->>-Client: 403 Forbidden ({"error":"WIP limit exceeded"})
    Note right of Tyk Gateway: Request does not reach Upstream API

局限性与未来迭代路径

当前方案将复杂的业务校验逻辑成功地从后端服务中剥离出来,集中在 API 网关层面进行处理,并通过缓存机制保证了高性能。然而,它并非银弹,存在一些局限性:

  1. 缓存一致性与分布式问题: 我们使用的是节点本地的内存缓存。在 Tyk 网关集群部署时,每个节点都有一份独立的缓存。这不仅会造成数据冗余,更严重的是,当一个节点的缓存因为 TTL 过期而去数据库更新了数据后,其他节点的缓存依然是旧的。一个可行的优化路径是切换到分布式缓存,如 Redis,以保证所有网关节点共享同一份缓存视图。

  2. 插件与业务的耦合: 虽然解耦了后端服务,但插件本身现在与看板的业务规则(特别是数据库 schema)紧密耦合。如果 cardscolumns 表的结构发生变化,插件代码也需要同步修改、编译和部署。这增加了维护的复杂度。对于更复杂的场景,可以考虑让插件调用一个专门的、轻量级的“规则服务”,而不是直接连接数据库。

  3. Fail-open 策略的风险: 当前的错误处理(如数据库查询失败)采用的是 fail-open 策略,即放行请求。这保证了系统的可用性,但在某些对规则执行有严格要求的场景下,可能会导致数据不一致(例如,暂时超出了 WIP 限制)。在未来的迭代中,可以引入基于配置的策略切换,允许针对不同 API 或环境选择 fail-open 或 fail-close。


  目录