构建支持动态路由与模型生命周期管理的 MLOps 服务网关


在我们的 MLOps 平台演进过程中,一个核心的痛点逐渐浮现:模型部署流程严重依赖基础设施团队。数据科学家每发布一个新模型或更新一个版本,都需要提交工单,等待 SRE 手动配置 Nginx Ingress、更新服务发现规则,并小心翼翼地执行灰度发布。这个流程不仅迟缓,而且极易出错,严重拖慢了从模型训练完成到线上A/B测试的迭代速度。我们需要的是一个能让数据科学家“自助”管理模型上线的内部平台,其核心是一个能理解模型版本和流量分配的智能服务网关。

定义问题:一个面向 MLOps 的动态流量调度层

我们的目标是构建一个系统,它必须满足以下几个核心需求:

  1. 动态路由:网关能根据一个集中的配置源,实时更新路由规则,将外部请求(如 /predict/user-churn-v1)代理到内部正确的模型服务实例(如 http://churn-model-svc-v1:8000)。
  2. 加权流量分配:支持对同一模型的不同版本进行流量切分。例如,将 90% 的流量导向稳定的 v1 版本,同时将 10% 的“影子”流量导向新上线的 v2 版本,以进行在线评估。
  3. 自助式管理界面:提供一个简洁的Web界面,让数据科学家可以自行注册新模型、配置路由规则、调整流量权重,而无需理解底层 Kubernetes 或服务网格的复杂性。
  4. 高可用与高性能:作为所有模型预测流量的入口,该网关必须是轻量、高效且无单点故障的。

方案A:基于成熟 API 网关(如 Kong/APISIX)与服务网格的重量级方案

第一种思路是利用业界成熟的开源组件。我们可以使用 Kong 或 APISIX 作为核心网关,利用其强大的插件生态和 Admin API 来动态管理路由。后端模型服务可以部署在 Kubernetes 上,并结合 Istio 或 Linkerd 这样的服务网格来实现精细化的流量切分。

  • 优势:

    • 功能完备:开箱即用地获得了认证、授权、限流、熔断等企业级网关功能。
    • 社区成熟:遇到问题时,有大量的文档和社区支持。
    • 生态集成:能很好地与 Prometheus、Jaeger 等云原生可观测性工具集成。
  • 劣势:

    • 过度复杂:对于我们当前的核心痛点——动态模型路由,引入一整套服务网格和重量级 API 网关显得“杀鸡用牛刀”。团队需要投入巨大的学习成本来理解和维护这些复杂的组件。在真实项目中,运维一个高可用的 Istio 集群本身就是一个巨大的挑战。
    • 配置黑盒:虽然它们提供了 API,但其内部路由和流量管理的实现对我们来说是一个黑盒。当出现复杂的路由问题时,调试难度非常大。
    • 资源开销:这些组件,特别是服务网格的数据平面代理(如 Envoy),会引入额外的资源消耗和网络延迟,对于低延迟敏感的模型服务可能不是最优选择。

方案B:自研轻量级 Go 代理 + React 控制平面的定制化方案

第二种思路是回归问题本质。我们最核心的需求是“动态路由”和“加权负载均衡”。这完全可以通过构建一个极简的反向代理来实现,该代理的路由表由一个独立的控制平面服务来管理。

  • 优势:

    • 极简可控:我们可以用 Go 的 net/http/httputil 在几百行代码内实现一个高性能的反向代理。所有的路由逻辑都是我们自己编写的,完全白盒,易于理解、调试和扩展。
    • 性能卓越:Go 语言在网络编程和并发处理上表现优异,构建的代理服务本身资源占用极低,网络转发性能极高。
    • 与业务场景强耦合:我们可以为 MLOps 场景量身定做 API 和数据模型,例如直接支持“模型”、“版本”、“流量权重”等概念,而不是去适配通用的“上游”、“服务”、“路由”等网关术语。
    • 快速迭代:前端使用 Chakra UI 这样的组件库,可以极快地搭建出功能完善且美观的内部管理界面,后端一个简单的 CRUD API 即可驱动整个系统。
  • 劣势:

    • 重复造轮子:认证、限流等高级功能需要自行实现或集成第三方库。
    • 维护成本:虽然代码简单,但这部分代码的稳定性和可用性需要我们自己的团队来负责。

最终决策与理由

我们选择了方案B。理由非常务实:在一个内部平台项目中,解决核心问题的最简单直接的方案,通常就是最好的方案。方案A虽然强大,但它带来的运维复杂性和学习成本远远超过了它所带来的好处。我们的目标不是构建一个通用的 API 网关,而是解决 MLOps 流程中的一个特定瓶颈。自研一个轻量级的代理,可以将所有复杂性都控制在自己手中,确保系统的可维护性和长期演进能力。

核心实现概览

整个系统分为三个部分:动态代理层 (Data Plane)**、控制平面API (Control Plane)** 和 **管理前端 (UI)**。

graph TD
    subgraph User Interaction
        A[数据科学家] --> B{Chakra UI 管理界面};
    end

    subgraph Control Plane
        B -- "发起API请求 (增/删/改路由)" --> C[Go Control API];
        C -- "更新路由配置" --> D[(Redis)];
    end

    subgraph Data Plane
        E[外部请求] --> F{Go 动态代理};
        F -- "周期性拉取配置" --> D;
        F -- "根据权重代理流量" --> G1[模型服务 v1];
        F -- "根据权重代理流量" --> G2[模型服务 v2];
        F -- "根据权重代理流量" --> G3[模型服务 vN];
    end

    style F fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#ccf,stroke:#333,stroke-width:2px
    style B fill:#cfc,stroke:#333,stroke-width:2px

1. 动态代理层:Go 实现的高性能反向代理

这是系统的核心。它是一个无状态的服务,可以水平扩展。它的唯一职责就是根据从 Redis 中获取的路由规则来转发 HTTP 请求。

main.go:

package main

import (
	"context"
	"encoding/json"
	"log"
	"math/rand"
	"net/http"
	"net/http/httputil"
	"net/url"
	"strings"
	"sync"
	"time"

	"github.com/go-redis/redis/v8"
)

// Target defines an upstream server for a model version.
type Target struct {
	URL    string `json:"url"`
	Weight int    `json:"weight"`
}

// Route defines the routing rule for a specific model path.
type Route struct {
	Path    string   `json:"path"`
	Targets []Target `json:"targets"`
}

// RouteManager holds the routing table and handles its updates.
type RouteManager struct {
	mu     sync.RWMutex
	routes map[string][]Target
	rdb    *redis.Client
}

// NewRouteManager creates a new manager and starts the background poller.
func NewRouteManager(redisAddr string) *RouteManager {
	rdb := redis.NewClient(&redis.Options{
		Addr: redisAddr,
	})
	
	manager := &RouteManager{
		routes: make(map[string][]Target),
		rdb:    rdb,
	}

	// Initial load
	if err := manager.updateRoutes(); err != nil {
		log.Printf("[ERROR] Initial route update failed: %v", err)
	}

	// Start background ticker to periodically update routes
	go func() {
		ticker := time.NewTicker(5 * time.Second)
		defer ticker.Stop()
		for range ticker.C {
			if err := manager.updateRoutes(); err != nil {
				log.Printf("[ERROR] Background route update failed: %v", err)
			}
		}
	}()

	return manager
}

// updateRoutes fetches all route configurations from Redis and updates the in-memory map.
func (rm *RouteManager) updateRoutes() error {
	ctx := context.Background()
	keys, err := rm.rdb.Keys(ctx, "routes:*").Result()
	if err != nil {
		return err
	}

	newRoutes := make(map[string][]Target)
	for _, key := range keys {
		val, err := rm.rdb.Get(ctx, key).Result()
		if err != nil {
			log.Printf("[WARN] Failed to get key %s: %v", key, err)
			continue
		}
		var route Route
		if err := json.Unmarshal([]byte(val), &route); err != nil {
			log.Printf("[WARN] Failed to unmarshal route for key %s: %v", key, err)
			continue
		}
		newRoutes[route.Path] = route.Targets
	}

	rm.mu.Lock()
	rm.routes = newRoutes
	rm.mu.Unlock()
	
	log.Printf("[INFO] Routes updated. Total routes: %d", len(newRoutes))
	return nil
}

// getTarget selects a target based on weighted random selection.
func (rm *RouteManager) getTarget(path string) *url.URL {
	rm.mu.RLock()
	defer rm.mu.RUnlock()

	targets, ok := rm.routes[path]
	if !ok || len(targets) == 0 {
		return nil
	}

	totalWeight := 0
	for _, target := range targets {
		totalWeight += target.Weight
	}

	if totalWeight == 0 {
		// If all weights are 0, fall back to random selection
		randIdx := rand.Intn(len(targets))
		targetURL, _ := url.Parse(targets[randIdx].URL)
		return targetURL
	}

	r := rand.Intn(totalWeight)
	cumulativeWeight := 0
	for _, target := range targets {
		cumulativeWeight += target.Weight
		if r < cumulativeWeight {
			targetURL, err := url.Parse(target.URL)
			if err != nil {
				log.Printf("[ERROR] Invalid target URL %s: %v", target.URL, err)
				return nil
			}
			return targetURL
		}
	}
	
	return nil
}

func main() {
	rand.Seed(time.Now().UnixNano())
	
	// A common mistake is to hardcode Redis address. Use environment variables in production.
	redisAddr := "localhost:6379" 
	routeManager := NewRouteManager(redisAddr)

	proxyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Match path prefix. In a real project, you might want a more sophisticated router like a trie.
		var matchedPath string
		for path := range routeManager.routes {
			if strings.HasPrefix(r.URL.Path, path) {
				matchedPath = path
				break
			}
		}

		if matchedPath == "" {
			http.Error(w, "Service not found", http.StatusNotFound)
			return
		}
		
		targetURL := routeManager.getTarget(matchedPath)
		if targetURL == nil {
			http.Error(w, "No available upstream for this service", http.StatusServiceUnavailable)
			return
		}

		log.Printf("[INFO] Request %s %s -> %s", r.Method, r.URL.Path, targetURL)

		proxy := httputil.NewSingleHostReverseProxy(targetURL)
		
		// A crucial part: modify the request header before forwarding.
		// This ensures backend services can correctly identify the client IP.
		proxy.Director = func(req *http.Request) {
			req.URL.Scheme = targetURL.Scheme
			req.URL.Host = targetURL.Host
			req.URL.Path = r.URL.Path
			req.Host = targetURL.Host // Set Host header
			
			// Clean up headers that might leak internal information.
			req.Header.Del("X-Forwarded-For")
			
			// Append the client's IP to X-Forwarded-For
			// In production, consider trusted proxies.
			clientIP := r.RemoteAddr
			if colon := strings.LastIndex(clientIP, ":"); colon != -1 {
				clientIP = clientIP[:colon]
			}
			req.Header.Set("X-Forwarded-For", clientIP)
		}
		
		// Implement error handling for the proxy
		proxy.ErrorHandler = func(rw http.ResponseWriter, req *http.Request, err error) {
			log.Printf("[ERROR] Proxy error: %v", err)
			rw.WriteHeader(http.StatusBadGateway)
		}

		proxy.ServeHTTP(w, r)
	})

	log.Println("[INFO] Starting MLOps Gateway on :8080")
	if err := http.ListenAndServe(":8080", proxyHandler); err != nil {
		log.Fatalf("[FATAL] Server failed to start: %v", err)
	}
}

这段代码的核心在于 RouteManager,它在后台每5秒从 Redis 拉取最新的路由配置,并将其缓存在内存中。getTarget 方法实现了加权随机算法,这是实现流量切分功能的关键。

2. 控制平面API:简单的 CRUD 接口

这个API服务同样用 Go 实现,它提供给前端界面调用,用于修改存储在 Redis 中的路由规则。

control_api.go:

// This is a simplified example, intended to run in a separate process or as part of the same binary with a different port.
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "github.com/go-redis/redis/v8"
    "github.com/google/uuid"
)

// Re-using the same structs
type Target struct {
	URL    string `json:"url"`
	Weight int    `json:"weight"`
}
type Route struct {
	ID      string   `json:"id"`
	Path    string   `json:"path"`
	Targets []Target `json:"targets"`
}

var rdb *redis.Client

func routeHandler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/json")
    w.Header().Set("Access-Control-Allow-Origin", "*") // For development only!
    w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
    w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")

    if r.Method == "OPTIONS" {
        w.WriteHeader(http.StatusOK)
        return
    }

    switch r.Method {
    case http.MethodPost:
        var route Route
        if err := json.NewDecoder(r.Body).Decode(&route); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        // Basic validation
        if route.Path == "" || len(route.Targets) == 0 {
             http.Error(w, "path and targets are required", http.StatusBadRequest)
            return
        }
        
        // In a real project, validate that weights sum up to 100 if you want percentage-based logic.
        // Here, we just use raw weights.

        route.ID = uuid.New().String()
        routeJSON, _ := json.Marshal(route)

        // The key format `routes:{path}` makes it easy to query and manage.
        err := rdb.Set(context.Background(), fmt.Sprintf("routes:%s", route.Path), routeJSON, 0).Err()
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        json.NewEncoder(w).Encode(route)

    case http.MethodGet:
        keys, err := rdb.Keys(context.Background(), "routes:*").Result()
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        routes := []Route{}
        for _, key := range keys {
            val, _ := rdb.Get(context.Background(), key).Result()
            var route Route
            json.Unmarshal([]byte(val), &route)
            routes = append(routes, route)
        }
        json.NewEncoder(w).Encode(routes)

    // DELETE and PUT handlers would be here
    default:
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}


func main() {
    rdb = redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    http.HandleFunc("/api/routes", routeHandler)
    log.Println("[INFO] Starting Control Plane API on :8081")
    if err := http.ListenAndServe(":8081", nil); err != nil {
        log.Fatalf("[FATAL] API server failed to start: %v", err)
    }
}

3. 管理前端:基于 Chakra UI 的 React 仪表盘

前端的目标是提供一个清晰、无干扰的界面。Chakra UI 的组合式组件和优秀的默认样式非常适合快速搭建这类内部工具。

RouteManagement.js:

import React, { useState, useEffect } from 'react';
import {
  Box, Heading, Button, VStack, HStack,
  Table, Thead, Tbody, Tr, Th, Td,
  FormControl, FormLabel, Input, useToast,
  IconButton,
} from '@chakra-ui/react';
import { AddIcon, DeleteIcon } from '@chakra-ui/icons';
import axios from 'axios';

// The API endpoint should be configured via environment variables.
const API_URL = 'http://localhost:8081/api/routes';

function RouteManagement() {
  const [routes, setRoutes] = useState([]);
  const [path, setPath] = useState('');
  const [targets, setTargets] = useState([{ url: '', weight: 100 }]);
  const toast = useToast();

  const fetchRoutes = async () => {
    try {
      const response = await axios.get(API_URL);
      setRoutes(response.data || []);
    } catch (error) {
      toast({
        title: 'Failed to fetch routes',
        description: error.message,
        status: 'error',
        duration: 5000,
        isClosable: true,
      });
    }
  };

  useEffect(() => {
    fetchRoutes();
  }, []);

  const handleAddTarget = () => {
    setTargets([...targets, { url: '', weight: 0 }]);
  };

  const handleTargetChange = (index, field, value) => {
    const newTargets = [...targets];
    // A common mistake is to not parse integer values from input fields.
    newTargets[index][field] = field === 'weight' ? parseInt(value, 10) || 0 : value;
    setTargets(newTargets);
  };
  
  const handleRemoveTarget = (index) => {
    const newTargets = targets.filter((_, i) => i !== index);
    setTargets(newTargets);
  };
  
  const handleSubmit = async (e) => {
    e.preventDefault();
    const totalWeight = targets.reduce((sum, t) => sum + t.weight, 0);
    // In a real project, this validation is crucial.
    if (totalWeight <= 0) {
      toast({ title: 'Total weight must be greater than 0', status: 'error' });
      return;
    }
    
    const newRoute = { path, targets };
    try {
      await axios.post(API_URL, newRoute);
      toast({ title: 'Route created successfully', status: 'success' });
      fetchRoutes(); // Refresh list
      // Reset form
      setPath('');
      setTargets([{ url: '', weight: 100 }]);
    } catch (error) {
      toast({ title: 'Failed to create route', description: error.message, status: 'error' });
    }
  };

  return (
    <Box p={8}>
      <Heading mb={6}>MLOps Model Route Management</Heading>
      
      <Box as="form" onSubmit={handleSubmit} p={6} borderWidth={1} borderRadius="md" mb={8}>
        <VStack spacing={4}>
          <FormControl isRequired>
            <FormLabel>Request Path</FormLabel>
            <Input 
              placeholder="/predict/my-model" 
              value={path} 
              onChange={(e) => setPath(e.target.value)} 
            />
          </FormControl>

          {targets.map((target, index) => (
            <HStack key={index} width="100%">
              <FormControl isRequired>
                <FormLabel>Target URL {index + 1}</FormLabel>
                <Input 
                  placeholder="http://model-svc-v1:8000" 
                  value={target.url}
                  onChange={(e) => handleTargetChange(index, 'url', e.target.value)}
                />
              </FormControl>
              <FormControl isRequired width="200px">
                <FormLabel>Weight</FormLabel>
                <Input 
                  type="number" 
                  value={target.weight}
                  onChange={(e) => handleTargetChange(index, 'weight', e.target.value)}
                />
              </FormControl>
              {targets.length > 1 && (
                  <IconButton 
                    aria-label="Remove target" 
                    icon={<DeleteIcon />} 
                    onClick={() => handleRemoveTarget(index)}
                    alignSelf="flex-end"
                  />
              )}
            </HStack>
          ))}
          
          <Button leftIcon={<AddIcon />} onClick={handleAddTarget} alignSelf="flex-start">
            Add Another Target (for Canary/Shadow)
          </Button>

          <Button type="submit" colorScheme="blue" width="100%">Create Route</Button>
        </VStack>
      </Box>

      <Heading size="lg" mb={4}>Current Routes</Heading>
      <Table variant="simple">
        <Thead>
          <Tr>
            <Th>Path</Th>
            <Th>Targets</Th>
          </Tr>
        </Thead>
        <Tbody>
          {routes.map((route) => (
            <Tr key={route.id}>
              <Td>{route.path}</Td>
              <Td>
                <VStack align="start">
                {route.targets.map((t, i) => (
                  <Box key={i}>{t.url} (Weight: {t.weight})</Box>
                ))}
                </VStack>
              </Td>
            </Tr>
          ))}
        </Tbody>
      </Table>
    </Box>
  );
}

export default RouteManagement;

这段 React 代码展示了如何使用 Chakra UI 的 FormControl, Input, Table 等组件快速构建一个结构清晰的管理界面。它处理了表单状态、动态增删目标地址以及与后端 API 的交互。

架构的扩展性与局限性

这个轻量级的方案为我们未来的扩展打下了坚实的基础。例如,我们可以在 Go 代理的中间件层轻松地加入基于 JWT 的认证,或者集成 Prometheus 客户端来暴露每个路由的请求延迟和状态码指标。控制平面也可以进一步与 Gitlab CI 集成,实现模型训练流水线成功后自动注册新版本到网关。

然而,这个方案的局限性也同样明显。它并非一个通用的 API 网关。它缺少复杂的插件机制、高级的流量治理策略(如复杂的重试和熔断逻辑)以及一个庞大的社区生态。如果业务需求演进到需要复杂的 WAF 防护、GraphQL 代理或 gRPC-Web 转码等功能,那么重新评估并迁移到像 Envoy 或 APISIX 这样的成熟网关将是更明智的选择。我们当前构建的,是一个高度专注、解决特定 MLOps 痛点的内部工具,它的价值在于其简洁、高效和完全可控。


  目录