基于PostgreSQL动态规则引擎与Redux状态管理的WAF实时攻防监控架构实现


我们面临一个棘手的挑战:传统的Web应用防火墙(WAF)规则更新机制过于静态,依赖于文件分发和手动重载,响应周期以小时甚至天为单位。在现代高强度、自动化的攻击面前,这种延迟是致命的。我们需要一个能够实现近乎实时规则下发、攻击事件秒级上报与响应的动态WAF管控平台。这意味着WAF的“大脑”必须从本地配置文件中解放出来,迁移到一个集中式、可编程、具备持久化能力的后端系统中。

方案选型与权衡

方案A:基于文件分发与inotify的准动态方案

这是一种常见的改良方案。通过一个中央服务生成规则文件,然后使用rsync等工具分发到各个WAF节点。WAF节点上的守护进程通过inotify监控文件变化,自动触发配置重载。

  • 优点:
    • 技术栈简单,易于实现。
    • 对现有WAF架构侵入性小。
  • 缺点:
    • 时效性差: 文件分发、inotify触发、服务重载整个链路存在分钟级的延迟。
    • 原子性问题: 在重载瞬间,可能会有短暂的服务不可用或规则不一致。
    • 状态管理缺失: 无法精细化管理规则版本、审计变更记录、聚合攻击日志进行关联分析。

方案B:基于API和数据库的实时动态方案

该方案将规则的生命周期管理完全交给一个后端服务。WAF节点不再是独立的决策单元,而是作为策略执行点(PEP),通过API向策略决策点(PDP)请求最新的规则集。

  • 优点:
    • 实时性: 规则变更可以通过API推送或拉取,时效性缩短至秒级。
    • 集中化与可编程: 所有规则集中存储在数据库中,可以构建复杂的管理界面,实现规则的编排、灰度发布、版本控制和审计。
    • 数据驱动: 攻击日志可以实时回传到中心数据库,为自动化的威胁分析和规则生成提供数据基础。
  • 缺点:
    • 架构复杂度高: 引入了新的中心服务和数据库依赖,成为系统关键路径上的新风险点。
    • 性能要求苛刻: 中心服务的API必须具备极高的可用性和极低的延迟,否则将直接影响所有业务流量。

最终我们选择了方案B。现代安全对抗的本质是速度对抗,方案A的延迟在我们的场景下无法接受。方案B的复杂度和性能挑战是工程问题,可以通过稳健的设计来解决。

整体架构设计

我们将系统拆分为三个核心部分:数据持久层、中心控制API、前端监控面板。

  1. 数据持久层 (PostgreSQL): 负责存储WAF规则、规则组、攻击事件日志以及操作审计记录。
  2. 中心控制API (Go): 为WAF节点提供规则拉取接口,接收WAF上报的攻击事件,并通过WebSocket向上层推送实时数据。
  3. 前端监控面板 (React + Redux): 实时可视化攻击事件,并提供规则管理界面,允许安全工程师动态调整策略。
graph TD
    subgraph "用户/攻击者"
        A[Client]
    end

    subgraph "边缘节点"
        B[Nginx + ModSecurity WAF]
    end

    subgraph "中心控制平面"
        C[Go API Service]
        D[PostgreSQL Database]
        E[WebSocket Server]
    end

    subgraph "安全运营中心 (SOC)"
        F[React/Redux Dashboard]
        G[Security Analyst]
    end

    A -- HTTP/HTTPS Request --> B
    B -- 1. Pull Rules (HTTP GET) --> C
    C -- 2. Query Rules --> D
    D -- 3. Return Rules --> C
    C -- 4. Return Ruleset JSON --> B
    B -- 5. Block/Allow Request --> A
    B -- 6. Report Attack Event (HTTP POST) --> C
    C -- 7. Store Event --> D
    C -- 8. Push Event via WebSocket --> E
    E -- 9. Real-time Event --> F
    G -- 10. Manage Rules --> F
    F -- 11. API Call (Create/Update Rule) --> C
    C -- 12. Update Rule in DB --> D

数据层:PostgreSQL Schema设计

选择PostgreSQL的核心原因在于其稳定性和强大的功能,特别是JSONB类型和GIN索引,它们为存储灵活多变的WAF规则提供了极大的便利。

规则表 (waf_rules)

这张表是核心,存储每一条独立的WAF规则。rule_content字段使用JSONB,可以容纳不同WAF引擎(如ModSecurity、Coraza)的特定指令格式。

-- waf_rules.sql

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE waf_rules (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    name VARCHAR(255) NOT NULL UNIQUE,
    description TEXT,
    -- 规则是否启用
    is_enabled BOOLEAN NOT NULL DEFAULT TRUE,
    -- 规则严重等级: 1-Low, 2-Medium, 3-High, 4-Critical
    severity SMALLINT NOT NULL CHECK (severity BETWEEN 1 AND 4),
    -- 规则动作: block, log, pass
    action VARCHAR(50) NOT NULL DEFAULT 'block',
    -- 使用JSONB存储规则具体内容,提供灵活性
    -- 例如: {"type": "modsecurity", "secrule": "REQUEST_URI \"@rx evil\""}
    rule_content JSONB NOT NULL,
    version INT NOT NULL DEFAULT 1,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 为规则内容创建GIN索引以加速查询
CREATE INDEX idx_waf_rules_content ON waf_rules USING GIN (rule_content);

-- 自动更新updated_at时间戳
CREATE OR REPLACE FUNCTION trigger_set_timestamp()
RETURNS TRIGGER AS $$
BEGIN
  NEW.updated_at = NOW();
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER set_timestamp
BEFORE UPDATE ON waf_rules
FOR EACH ROW
EXECUTE FUNCTION trigger_set_timestamp();

COMMENT ON COLUMN waf_rules.severity IS '1: Low, 2: Medium, 3: High, 4: Critical';

攻击事件日志表 (attack_events)

这张表用于接收WAF上报的攻击日志。它被设计为写入密集型。为了性能,可以考虑使用分区表。

-- attack_events.sql

CREATE TABLE attack_events (
    id BIGSERIAL PRIMARY KEY,
    -- 关联触发的规则ID
    triggered_rule_id UUID REFERENCES waf_rules(id),
    client_ip INET NOT NULL,
    request_method VARCHAR(10) NOT NULL,
    request_uri TEXT NOT NULL,
    http_version VARCHAR(10),
    host_header VARCHAR(255),
    user_agent TEXT,
    -- 存储完整的攻击载荷或其他匹配信息
    matched_data JSONB,
    event_time TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 在常用查询字段上创建索引
CREATE INDEX idx_attack_events_time ON attack_events (event_time DESC);
CREATE INDEX idx_attack_events_client_ip ON attack_events (client_ip);
CREATE INDEX idx_attack_events_triggered_rule_id ON attack_events (triggered_rule_id);

中心控制API:Go语言实现

Go语言的高并发性能和简洁的语法非常适合构建这类高性能API服务。我们将实现两个关键端点和一个WebSocket服务。

API服务主结构

我们使用gorilla/mux进行路由,pgx作为高性能的PostgreSQL驱动。

// main.go
package main

import (
	"context"
	"log"
	"net/http"
	"time"

	"github.com/gorilla/mux"
	"github.com/jackc/pgx/v4/pgxpool"
	"github.com/patrickmn/go-cache"
)

type App struct {
	Router *mux.Router
	DB     *pgxpool.Pool
	Cache  *cache.Cache // 内存缓存,减轻数据库压力
	Hub    *Hub         // WebSocket Hub
}

func (a *App) Initialize(dbConnectionString string) {
	var err error
	// 初始化数据库连接池
	a.DB, err = pgxpool.Connect(context.Background(), dbConnectionString)
	if err != nil {
		log.Fatalf("Unable to connect to database: %v\n", err)
	}

	// 初始化内存缓存,规则缓存5分钟,每10分钟清理一次过期的项
	a.Cache = cache.New(5*time.Minute, 10*time.Minute)
    
    // 初始化WebSocket Hub
    a.Hub = newHub()
    go a.Hub.run()

	a.Router = mux.NewRouter()
	a.initializeRoutes()
}

func (a *App) Run(addr string) {
    log.Printf("Server starting on %s", addr)
	log.Fatal(http.ListenAndServe(addr, a.Router))
}

func (a. *App) initializeRoutes() {
    // WAF拉取规则的接口,性能要求极高
	a.Router.HandleFunc("/api/v1/rules", a.getRules).Methods("GET")
    // WAF上报攻击事件的接口,写入要求高
	a.Router.HandleFunc("/api/v1/events", a.createEvent).Methods("POST")
    // WebSocket接入点
    a.Router.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        serveWs(a.Hub, w, r)
    })
}

func main() {
	app := App{}
	// 在生产环境中,应使用环境变量或配置文件
	app.Initialize("postgres://user:password@localhost:5432/waf_control")
	app.Run(":8080")
}

规则拉取接口 (/api/v1/rules)

这个接口的性能至关重要。WAF节点会频繁调用它。因此,必须使用缓存。

// handlers.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"time"
)

const rulesCacheKey = "active_waf_rules"

type Rule struct {
	ID          string          `json:"id"`
	Name        string          `json:"name"`
	RuleContent json.RawMessage `json:"rule_content"`
	Action      string          `json:"action"`
}

// getRules handles requests from WAF nodes to fetch the latest rule set.
func (a *App) getRules(w http.ResponseWriter, r *http.Request) {
	// 1. 尝试从内存缓存中获取规则
	if rules, found := a.Cache.Get(rulesCacheKey); found {
		respondWithJSON(w, http.StatusOK, rules)
		return
	}

	// 2. 如果缓存未命中,从数据库查询
	query := `
        SELECT id, name, rule_content, action 
        FROM waf_rules 
        WHERE is_enabled = TRUE 
        ORDER BY severity DESC;
    `
	rows, err := a.DB.Query(context.Background(), query)
	if err != nil {
		log.Printf("Error querying rules: %v", err)
		respondWithError(w, http.StatusInternalServerError, "Internal Server Error")
		return
	}
	defer rows.Close()

	var rules []Rule
	for rows.Next() {
		var rule Rule
		if err := rows.Scan(&rule.ID, &rule.Name, &rule.RuleContent, &rule.Action); err != nil {
			log.Printf("Error scanning rule row: %v", err)
			continue // 跳过有问题的行,而不是中断整个请求
		}
		rules = append(rules, rule)
	}

	if rows.Err() != nil {
		log.Printf("Error during rows iteration: %v", rows.Err())
		respondWithError(w, http.StatusInternalServerError, "Internal Server Error")
		return
	}

	// 3. 将查询结果存入缓存
	a.Cache.Set(rulesCacheKey, rules, cache.DefaultExpiration)
    
    // 一个常见的错误是在规则更新时没有主动清除缓存。
    // 在实现更新规则的API时,必须记得调用 a.Cache.Delete(rulesCacheKey)。

	respondWithJSON(w, http.StatusOK, rules)
}

func respondWithError(w http.ResponseWriter, code int, message string) {
	respondWithJSON(w, code, map[string]string{"error": message})
}

func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
	response, _ := json.Marshal(payload)
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(code)
	w.Write(response)
}

事件上报接口 (/api/v1/events)

这个接口处理的是写入操作。为了应对高并发写入,一个优化思路是采用批处理,但这里为了简化,我们先实现单条写入。

// handlers.go (continued)
package main

import (
    // ... other imports
    "io/ioutil"
)

type AttackEvent struct {
	TriggeredRuleID string          `json:"triggered_rule_id"`
	ClientIP        string          `json:"client_ip"`
	RequestMethod   string          `json:"request_method"`
	RequestURI      string          `json:"request_uri"`
	MatchedData     json.RawMessage `json:"matched_data"`
}

func (a *App) createEvent(w http.ResponseWriter, r *http.Request) {
	var event AttackEvent
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		respondWithError(w, http.StatusBadRequest, "Invalid request body")
		return
	}
	defer r.Body.Close()

	if err := json.Unmarshal(body, &event); err != nil {
		respondWithError(w, http.StatusBadRequest, "Invalid JSON format")
		return
	}

	// 异步处理,立即返回202 Accepted,避免阻塞WAF
    // 在真实项目中,这里应该将事件推送到一个高吞吐的消息队列(如Kafka)
    // 然后由消费者服务进行落库和分发。
    // 为了演示,我们直接写入数据库并广播。
    go func(e AttackEvent, bodyBytes []byte) {
        query := `
            INSERT INTO attack_events (triggered_rule_id, client_ip, request_method, request_uri, matched_data)
            VALUES ($1, $2, $3, $4, $5);
        `
        _, err := a.DB.Exec(context.Background(), query, e.TriggeredRuleID, e.ClientIP, e.RequestMethod, e.RequestURI, e.MatchedData)
        if err != nil {
            log.Printf("Error inserting attack event: %v", err)
            // 这里需要有重试或死信队列机制
            return
        }
        
        // 成功入库后,通过WebSocket广播给所有前端客户端
        a.Hub.broadcast <- bodyBytes
    }(event, body)

	w.WriteHeader(http.StatusAccepted)
}

注:WebSocket的实现(hub.go, client.go)代码较为通用和冗长,这里省略,其核心功能是在Hub中维护一个客户端连接池,并通过一个channel接收createEvent中发来的消息,然后遍历所有客户端进行广播。

前端状态管理:Redux实时面板

前端的核心挑战在于处理源源不断的实时攻击事件流,同时还要管理复杂的UI状态(如规则编辑、筛选、排序)。Redux及其生态(Redux Toolkit, RTK Query)是应对这种复杂性的理想选择。

Redux Store 和 State 设计

我们的state shape需要清晰地分离不同领域的数据。

// store/index.ts
import { configureStore } from '@reduxjs/toolkit';
import eventsReducer from './slices/eventsSlice';
import rulesApi from './api/rulesApi'; // RTK Query API slice

export const store = configureStore({
  reducer: {
    events: eventsReducer,
    // RTK Query自动管理API相关的状态
    [rulesApi.reducerPath]: rulesApi.reducer,
  },
  middleware: (getDefaultMiddleware) =>
    getDefaultMiddleware().concat(rulesApi.middleware),
});

export type RootState = ReturnType<typeof store.getState>;
export type AppDispatch = typeof store.dispatch;

// State Shape (Conceptual)
// {
//   events: {
//     liveEvents: AttackEvent[], // 实时事件列表
//     connectionStatus: 'connected' | 'disconnected' | 'connecting',
//     filter: { ... }
//   },
//   rulesApi: { // Managed by RTK Query
//     queries: { ... },
//     mutations: { ... },
//     provided: { ... },
//     subscriptions: { ... },
//     config: { ... }
//   }
// }

使用Redux Toolkit管理实时事件流

我们创建一个eventsSlice来处理WebSocket连接状态和接收到的事件数据。

// store/slices/eventsSlice.ts
import { createSlice, PayloadAction } from '@reduxjs/toolkit';

interface AttackEvent {
  // ...与后端匹配的类型定义
  id: string; // 前端为每条消息生成唯一ID,用于React的key
  client_ip: string;
  request_uri: string;
  // ...
}

interface EventsState {
  liveEvents: AttackEvent[];
  connectionStatus: 'connected' | 'disconnected' | 'connecting';
}

const MAX_EVENTS_IN_STATE = 200; // 内存中最多保留200条事件,防止内存溢出

const initialState: EventsState = {
  liveEvents: [],
  connectionStatus: 'disconnected',
};

const eventsSlice = createSlice({
  name: 'events',
  initialState,
  reducers: {
    setConnectionStatus(state, action: PayloadAction<EventsState['connectionStatus']>) {
      state.connectionStatus = action.payload;
    },
    newEventReceived(state, action: PayloadAction<Omit<AttackEvent, 'id'>>) {
      // 这里的坑在于,不能无限制地往数组里塞数据
      // 必须有一个淘汰策略,否则浏览器内存会爆炸
      const newEvent = { ...action.payload, id: new Date().toISOString() + Math.random() };
      state.liveEvents.unshift(newEvent); // 新事件放在最前面
      if (state.liveEvents.length > MAX_EVENTS_IN_STATE) {
        state.liveEvents.pop(); // 移除最旧的事件
      }
    },
    clearEvents(state) {
      state.liveEvents = [];
    }
  },
});

export const { setConnectionStatus, newEventReceived, clearEvents } = eventsSlice.actions;
export default eventsSlice.reducer;

WebSocket中间件

要将WebSocket与Redux连接起来,可以编写一个简单的中间件。

// middleware/socketMiddleware.ts
import { Middleware } from '@reduxjs/toolkit';
import { setConnectionStatus, newEventReceived } from '../slices/eventsSlice';

const socketMiddleware: Middleware = (store) => {
  let socket: WebSocket | null = null;

  return (next) => (action) => {
    // 假设我们有CONNECT和DISCONNECT action来控制连接
    if (action.type === 'socket/connect') {
      if (socket !== null) {
        socket.close();
      }

      store.dispatch(setConnectionStatus('connecting'));
      socket = new WebSocket(action.payload.url);

      socket.onopen = () => {
        store.dispatch(setConnectionStatus('connected'));
      };

      socket.onmessage = (event) => {
        try {
          const data = JSON.parse(event.data);
          // 这里的逻辑至关重要,它将外部数据源(WebSocket)的变化
          // 转化为了一个Redux action,从而融入了Redux的单向数据流。
          store.dispatch(newEventReceived(data));
        } catch (e) {
          console.error('Error parsing WebSocket message:', e);
        }
      };

      socket.onclose = () => {
        store.dispatch(setConnectionStatus('disconnected'));
        socket = null;
      };

      socket.onerror = (error) => {
        console.error('WebSocket Error:', error);
        store.dispatch(setConnectionStatus('disconnected'));
      };
    }

    if (action.type === 'socket/disconnect' && socket) {
      socket.close();
    }
    
    return next(action);
  };
};

export default socketMiddleware;

这个中间件让我们可以通过dispatch({ type: 'socket/connect', payload: { url: 'ws://...' }})来启动连接,并将所有收到的消息自动派发为newEventReceived action,UI组件只需订阅liveEvents state即可实现实时更新。

架构的局限性与未来迭代路径

当前这套架构解决了核心的实时性问题,但在生产环境中还存在一些局限性:

  1. 数据库瓶颈: 随着攻击事件量的增长,attack_events表的写入会成为瓶颈。未来的迭代路径是引入消息队列(如Kafka或Pulsar)作为API和数据库之间的缓冲层,实现削峰填谷。事件数据可以进一步流向ClickHouse这类OLAP数据库,以进行更高效的聚合分析。
  2. API可用性: 中心API是单点故障。必须部署为高可用集群,并置于负载均衡器之后。规则拉取接口的缓存策略也需要更精细化,例如使用分布式缓存(Redis)替代单机内存缓存。
  3. 规则引擎的智能性: 目前的规则系统还是基于人工定义。下一步是收集海量的attack_events数据,利用这些数据训练机器学习模型,自动发现异常模式并生成候选规则,实现从“被动防御”到“主动智能防御”的演进。
  4. 前端性能: 当事件频率极高时(例如DDoS攻击期间),向Redux state中无差别地推入所有事件会导致UI频繁重绘,甚至卡死。需要引入节流(throttling)或聚合策略,例如在UI上只显示采样后的事件,或者按类型聚合统计数据,而不是展示原始事件流。

  目录