在我们的 MLOps 平台演进过程中,一个核心的痛点逐渐浮现:模型部署流程严重依赖基础设施团队。数据科学家每发布一个新模型或更新一个版本,都需要提交工单,等待 SRE 手动配置 Nginx Ingress、更新服务发现规则,并小心翼翼地执行灰度发布。这个流程不仅迟缓,而且极易出错,严重拖慢了从模型训练完成到线上A/B测试的迭代速度。我们需要的是一个能让数据科学家“自助”管理模型上线的内部平台,其核心是一个能理解模型版本和流量分配的智能服务网关。
定义问题:一个面向 MLOps 的动态流量调度层
我们的目标是构建一个系统,它必须满足以下几个核心需求:
- 动态路由:网关能根据一个集中的配置源,实时更新路由规则,将外部请求(如
/predict/user-churn-v1
)代理到内部正确的模型服务实例(如http://churn-model-svc-v1:8000
)。 - 加权流量分配:支持对同一模型的不同版本进行流量切分。例如,将 90% 的流量导向稳定的
v1
版本,同时将 10% 的“影子”流量导向新上线的v2
版本,以进行在线评估。 - 自助式管理界面:提供一个简洁的Web界面,让数据科学家可以自行注册新模型、配置路由规则、调整流量权重,而无需理解底层 Kubernetes 或服务网格的复杂性。
- 高可用与高性能:作为所有模型预测流量的入口,该网关必须是轻量、高效且无单点故障的。
方案A:基于成熟 API 网关(如 Kong/APISIX)与服务网格的重量级方案
第一种思路是利用业界成熟的开源组件。我们可以使用 Kong 或 APISIX 作为核心网关,利用其强大的插件生态和 Admin API 来动态管理路由。后端模型服务可以部署在 Kubernetes 上,并结合 Istio 或 Linkerd 这样的服务网格来实现精细化的流量切分。
优势:
- 功能完备:开箱即用地获得了认证、授权、限流、熔断等企业级网关功能。
- 社区成熟:遇到问题时,有大量的文档和社区支持。
- 生态集成:能很好地与 Prometheus、Jaeger 等云原生可观测性工具集成。
劣势:
- 过度复杂:对于我们当前的核心痛点——动态模型路由,引入一整套服务网格和重量级 API 网关显得“杀鸡用牛刀”。团队需要投入巨大的学习成本来理解和维护这些复杂的组件。在真实项目中,运维一个高可用的 Istio 集群本身就是一个巨大的挑战。
- 配置黑盒:虽然它们提供了 API,但其内部路由和流量管理的实现对我们来说是一个黑盒。当出现复杂的路由问题时,调试难度非常大。
- 资源开销:这些组件,特别是服务网格的数据平面代理(如 Envoy),会引入额外的资源消耗和网络延迟,对于低延迟敏感的模型服务可能不是最优选择。
方案B:自研轻量级 Go 代理 + React 控制平面的定制化方案
第二种思路是回归问题本质。我们最核心的需求是“动态路由”和“加权负载均衡”。这完全可以通过构建一个极简的反向代理来实现,该代理的路由表由一个独立的控制平面服务来管理。
优势:
- 极简可控:我们可以用 Go 的
net/http/httputil
在几百行代码内实现一个高性能的反向代理。所有的路由逻辑都是我们自己编写的,完全白盒,易于理解、调试和扩展。 - 性能卓越:Go 语言在网络编程和并发处理上表现优异,构建的代理服务本身资源占用极低,网络转发性能极高。
- 与业务场景强耦合:我们可以为 MLOps 场景量身定做 API 和数据模型,例如直接支持“模型”、“版本”、“流量权重”等概念,而不是去适配通用的“上游”、“服务”、“路由”等网关术语。
- 快速迭代:前端使用 Chakra UI 这样的组件库,可以极快地搭建出功能完善且美观的内部管理界面,后端一个简单的 CRUD API 即可驱动整个系统。
- 极简可控:我们可以用 Go 的
劣势:
- 重复造轮子:认证、限流等高级功能需要自行实现或集成第三方库。
- 维护成本:虽然代码简单,但这部分代码的稳定性和可用性需要我们自己的团队来负责。
最终决策与理由
我们选择了方案B。理由非常务实:在一个内部平台项目中,解决核心问题的最简单直接的方案,通常就是最好的方案。方案A虽然强大,但它带来的运维复杂性和学习成本远远超过了它所带来的好处。我们的目标不是构建一个通用的 API 网关,而是解决 MLOps 流程中的一个特定瓶颈。自研一个轻量级的代理,可以将所有复杂性都控制在自己手中,确保系统的可维护性和长期演进能力。
核心实现概览
整个系统分为三个部分:动态代理层 (Data Plane)**、控制平面API (Control Plane)** 和 **管理前端 (UI)**。
graph TD subgraph User Interaction A[数据科学家] --> B{Chakra UI 管理界面}; end subgraph Control Plane B -- "发起API请求 (增/删/改路由)" --> C[Go Control API]; C -- "更新路由配置" --> D[(Redis)]; end subgraph Data Plane E[外部请求] --> F{Go 动态代理}; F -- "周期性拉取配置" --> D; F -- "根据权重代理流量" --> G1[模型服务 v1]; F -- "根据权重代理流量" --> G2[模型服务 v2]; F -- "根据权重代理流量" --> G3[模型服务 vN]; end style F fill:#f9f,stroke:#333,stroke-width:2px style C fill:#ccf,stroke:#333,stroke-width:2px style B fill:#cfc,stroke:#333,stroke-width:2px
1. 动态代理层:Go 实现的高性能反向代理
这是系统的核心。它是一个无状态的服务,可以水平扩展。它的唯一职责就是根据从 Redis 中获取的路由规则来转发 HTTP 请求。
main.go
:
package main
import (
"context"
"encoding/json"
"log"
"math/rand"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// Target defines an upstream server for a model version.
type Target struct {
URL string `json:"url"`
Weight int `json:"weight"`
}
// Route defines the routing rule for a specific model path.
type Route struct {
Path string `json:"path"`
Targets []Target `json:"targets"`
}
// RouteManager holds the routing table and handles its updates.
type RouteManager struct {
mu sync.RWMutex
routes map[string][]Target
rdb *redis.Client
}
// NewRouteManager creates a new manager and starts the background poller.
func NewRouteManager(redisAddr string) *RouteManager {
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
})
manager := &RouteManager{
routes: make(map[string][]Target),
rdb: rdb,
}
// Initial load
if err := manager.updateRoutes(); err != nil {
log.Printf("[ERROR] Initial route update failed: %v", err)
}
// Start background ticker to periodically update routes
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if err := manager.updateRoutes(); err != nil {
log.Printf("[ERROR] Background route update failed: %v", err)
}
}
}()
return manager
}
// updateRoutes fetches all route configurations from Redis and updates the in-memory map.
func (rm *RouteManager) updateRoutes() error {
ctx := context.Background()
keys, err := rm.rdb.Keys(ctx, "routes:*").Result()
if err != nil {
return err
}
newRoutes := make(map[string][]Target)
for _, key := range keys {
val, err := rm.rdb.Get(ctx, key).Result()
if err != nil {
log.Printf("[WARN] Failed to get key %s: %v", key, err)
continue
}
var route Route
if err := json.Unmarshal([]byte(val), &route); err != nil {
log.Printf("[WARN] Failed to unmarshal route for key %s: %v", key, err)
continue
}
newRoutes[route.Path] = route.Targets
}
rm.mu.Lock()
rm.routes = newRoutes
rm.mu.Unlock()
log.Printf("[INFO] Routes updated. Total routes: %d", len(newRoutes))
return nil
}
// getTarget selects a target based on weighted random selection.
func (rm *RouteManager) getTarget(path string) *url.URL {
rm.mu.RLock()
defer rm.mu.RUnlock()
targets, ok := rm.routes[path]
if !ok || len(targets) == 0 {
return nil
}
totalWeight := 0
for _, target := range targets {
totalWeight += target.Weight
}
if totalWeight == 0 {
// If all weights are 0, fall back to random selection
randIdx := rand.Intn(len(targets))
targetURL, _ := url.Parse(targets[randIdx].URL)
return targetURL
}
r := rand.Intn(totalWeight)
cumulativeWeight := 0
for _, target := range targets {
cumulativeWeight += target.Weight
if r < cumulativeWeight {
targetURL, err := url.Parse(target.URL)
if err != nil {
log.Printf("[ERROR] Invalid target URL %s: %v", target.URL, err)
return nil
}
return targetURL
}
}
return nil
}
func main() {
rand.Seed(time.Now().UnixNano())
// A common mistake is to hardcode Redis address. Use environment variables in production.
redisAddr := "localhost:6379"
routeManager := NewRouteManager(redisAddr)
proxyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Match path prefix. In a real project, you might want a more sophisticated router like a trie.
var matchedPath string
for path := range routeManager.routes {
if strings.HasPrefix(r.URL.Path, path) {
matchedPath = path
break
}
}
if matchedPath == "" {
http.Error(w, "Service not found", http.StatusNotFound)
return
}
targetURL := routeManager.getTarget(matchedPath)
if targetURL == nil {
http.Error(w, "No available upstream for this service", http.StatusServiceUnavailable)
return
}
log.Printf("[INFO] Request %s %s -> %s", r.Method, r.URL.Path, targetURL)
proxy := httputil.NewSingleHostReverseProxy(targetURL)
// A crucial part: modify the request header before forwarding.
// This ensures backend services can correctly identify the client IP.
proxy.Director = func(req *http.Request) {
req.URL.Scheme = targetURL.Scheme
req.URL.Host = targetURL.Host
req.URL.Path = r.URL.Path
req.Host = targetURL.Host // Set Host header
// Clean up headers that might leak internal information.
req.Header.Del("X-Forwarded-For")
// Append the client's IP to X-Forwarded-For
// In production, consider trusted proxies.
clientIP := r.RemoteAddr
if colon := strings.LastIndex(clientIP, ":"); colon != -1 {
clientIP = clientIP[:colon]
}
req.Header.Set("X-Forwarded-For", clientIP)
}
// Implement error handling for the proxy
proxy.ErrorHandler = func(rw http.ResponseWriter, req *http.Request, err error) {
log.Printf("[ERROR] Proxy error: %v", err)
rw.WriteHeader(http.StatusBadGateway)
}
proxy.ServeHTTP(w, r)
})
log.Println("[INFO] Starting MLOps Gateway on :8080")
if err := http.ListenAndServe(":8080", proxyHandler); err != nil {
log.Fatalf("[FATAL] Server failed to start: %v", err)
}
}
这段代码的核心在于 RouteManager
,它在后台每5秒从 Redis 拉取最新的路由配置,并将其缓存在内存中。getTarget
方法实现了加权随机算法,这是实现流量切分功能的关键。
2. 控制平面API:简单的 CRUD 接口
这个API服务同样用 Go 实现,它提供给前端界面调用,用于修改存储在 Redis 中的路由规则。
control_api.go
:
// This is a simplified example, intended to run in a separate process or as part of the same binary with a different port.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)
// Re-using the same structs
type Target struct {
URL string `json:"url"`
Weight int `json:"weight"`
}
type Route struct {
ID string `json:"id"`
Path string `json:"path"`
Targets []Target `json:"targets"`
}
var rdb *redis.Client
func routeHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*") // For development only!
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
switch r.Method {
case http.MethodPost:
var route Route
if err := json.NewDecoder(r.Body).Decode(&route); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Basic validation
if route.Path == "" || len(route.Targets) == 0 {
http.Error(w, "path and targets are required", http.StatusBadRequest)
return
}
// In a real project, validate that weights sum up to 100 if you want percentage-based logic.
// Here, we just use raw weights.
route.ID = uuid.New().String()
routeJSON, _ := json.Marshal(route)
// The key format `routes:{path}` makes it easy to query and manage.
err := rdb.Set(context.Background(), fmt.Sprintf("routes:%s", route.Path), routeJSON, 0).Err()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(route)
case http.MethodGet:
keys, err := rdb.Keys(context.Background(), "routes:*").Result()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
routes := []Route{}
for _, key := range keys {
val, _ := rdb.Get(context.Background(), key).Result()
var route Route
json.Unmarshal([]byte(val), &route)
routes = append(routes, route)
}
json.NewEncoder(w).Encode(routes)
// DELETE and PUT handlers would be here
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func main() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
http.HandleFunc("/api/routes", routeHandler)
log.Println("[INFO] Starting Control Plane API on :8081")
if err := http.ListenAndServe(":8081", nil); err != nil {
log.Fatalf("[FATAL] API server failed to start: %v", err)
}
}
3. 管理前端:基于 Chakra UI 的 React 仪表盘
前端的目标是提供一个清晰、无干扰的界面。Chakra UI 的组合式组件和优秀的默认样式非常适合快速搭建这类内部工具。
RouteManagement.js
:
import React, { useState, useEffect } from 'react';
import {
Box, Heading, Button, VStack, HStack,
Table, Thead, Tbody, Tr, Th, Td,
FormControl, FormLabel, Input, useToast,
IconButton,
} from '@chakra-ui/react';
import { AddIcon, DeleteIcon } from '@chakra-ui/icons';
import axios from 'axios';
// The API endpoint should be configured via environment variables.
const API_URL = 'http://localhost:8081/api/routes';
function RouteManagement() {
const [routes, setRoutes] = useState([]);
const [path, setPath] = useState('');
const [targets, setTargets] = useState([{ url: '', weight: 100 }]);
const toast = useToast();
const fetchRoutes = async () => {
try {
const response = await axios.get(API_URL);
setRoutes(response.data || []);
} catch (error) {
toast({
title: 'Failed to fetch routes',
description: error.message,
status: 'error',
duration: 5000,
isClosable: true,
});
}
};
useEffect(() => {
fetchRoutes();
}, []);
const handleAddTarget = () => {
setTargets([...targets, { url: '', weight: 0 }]);
};
const handleTargetChange = (index, field, value) => {
const newTargets = [...targets];
// A common mistake is to not parse integer values from input fields.
newTargets[index][field] = field === 'weight' ? parseInt(value, 10) || 0 : value;
setTargets(newTargets);
};
const handleRemoveTarget = (index) => {
const newTargets = targets.filter((_, i) => i !== index);
setTargets(newTargets);
};
const handleSubmit = async (e) => {
e.preventDefault();
const totalWeight = targets.reduce((sum, t) => sum + t.weight, 0);
// In a real project, this validation is crucial.
if (totalWeight <= 0) {
toast({ title: 'Total weight must be greater than 0', status: 'error' });
return;
}
const newRoute = { path, targets };
try {
await axios.post(API_URL, newRoute);
toast({ title: 'Route created successfully', status: 'success' });
fetchRoutes(); // Refresh list
// Reset form
setPath('');
setTargets([{ url: '', weight: 100 }]);
} catch (error) {
toast({ title: 'Failed to create route', description: error.message, status: 'error' });
}
};
return (
<Box p={8}>
<Heading mb={6}>MLOps Model Route Management</Heading>
<Box as="form" onSubmit={handleSubmit} p={6} borderWidth={1} borderRadius="md" mb={8}>
<VStack spacing={4}>
<FormControl isRequired>
<FormLabel>Request Path</FormLabel>
<Input
placeholder="/predict/my-model"
value={path}
onChange={(e) => setPath(e.target.value)}
/>
</FormControl>
{targets.map((target, index) => (
<HStack key={index} width="100%">
<FormControl isRequired>
<FormLabel>Target URL {index + 1}</FormLabel>
<Input
placeholder="http://model-svc-v1:8000"
value={target.url}
onChange={(e) => handleTargetChange(index, 'url', e.target.value)}
/>
</FormControl>
<FormControl isRequired width="200px">
<FormLabel>Weight</FormLabel>
<Input
type="number"
value={target.weight}
onChange={(e) => handleTargetChange(index, 'weight', e.target.value)}
/>
</FormControl>
{targets.length > 1 && (
<IconButton
aria-label="Remove target"
icon={<DeleteIcon />}
onClick={() => handleRemoveTarget(index)}
alignSelf="flex-end"
/>
)}
</HStack>
))}
<Button leftIcon={<AddIcon />} onClick={handleAddTarget} alignSelf="flex-start">
Add Another Target (for Canary/Shadow)
</Button>
<Button type="submit" colorScheme="blue" width="100%">Create Route</Button>
</VStack>
</Box>
<Heading size="lg" mb={4}>Current Routes</Heading>
<Table variant="simple">
<Thead>
<Tr>
<Th>Path</Th>
<Th>Targets</Th>
</Tr>
</Thead>
<Tbody>
{routes.map((route) => (
<Tr key={route.id}>
<Td>{route.path}</Td>
<Td>
<VStack align="start">
{route.targets.map((t, i) => (
<Box key={i}>{t.url} (Weight: {t.weight})</Box>
))}
</VStack>
</Td>
</Tr>
))}
</Tbody>
</Table>
</Box>
);
}
export default RouteManagement;
这段 React 代码展示了如何使用 Chakra UI 的 FormControl
, Input
, Table
等组件快速构建一个结构清晰的管理界面。它处理了表单状态、动态增删目标地址以及与后端 API 的交互。
架构的扩展性与局限性
这个轻量级的方案为我们未来的扩展打下了坚实的基础。例如,我们可以在 Go 代理的中间件层轻松地加入基于 JWT 的认证,或者集成 Prometheus 客户端来暴露每个路由的请求延迟和状态码指标。控制平面也可以进一步与 Gitlab CI 集成,实现模型训练流水线成功后自动注册新版本到网关。
然而,这个方案的局限性也同样明显。它并非一个通用的 API 网关。它缺少复杂的插件机制、高级的流量治理策略(如复杂的重试和熔断逻辑)以及一个庞大的社区生态。如果业务需求演进到需要复杂的 WAF 防护、GraphQL 代理或 gRPC-Web 转码等功能,那么重新评估并迁移到像 Envoy 或 APISIX 这样的成熟网关将是更明智的选择。我们当前构建的,是一个高度专注、解决特定 MLOps 痛点的内部工具,它的价值在于其简洁、高效和完全可控。