构建基于 Envoy 和 Fluentd 的高可观测性 SSG 实时更新架构


我们团队维护着一个庞大的 Monorepo,其中包含数十个基于 SSG(静态站点生成)构建的前端应用。CI/CD 流水线在高峰期会变得拥挤,构建过程漫长且状态不透明。开发者无法实时得知构建进度,只能反复刷新页面,体验极差。为了解决这个问题,我们需要一个实时、可靠且具备高可观测性的构建状态通知系统。

最初的构想很简单:一个后端服务通过 WebSocket 推送状态,前端页面接收。但随着深入思考,这个方案在可观测性和运维复杂度上暴露了问题。我们需要追踪每一条连接的生命周期,记录关键事件,并能对这些数据进行聚合分析。最终,技术选型锁定在了一套组合拳上:Server-Sent Events (SSE) 用于单向通信,Envoy Proxy 作为边缘网关处理连接并生成结构化日志,Fluentd 负责日志的收集与转发,而整个项目——包括 SSG 前端、SSE 服务和基础设施配置——都将在一个 Monorepo 中进行统一管理。

graph TD
    subgraph "客户端"
        Client[浏览器 EventSource API]
    end

    subgraph "边缘代理 (Envoy)"
        EnvoyProxy[Envoy Proxy]
    end

    subgraph "后端服务"
        SSEService[Node.js SSE 服务]
    end

    subgraph "日志管道"
        Fluentd[Fluentd]
        LogBackend[(日志后端 / ELK Stack)]
    end

    Client -- HTTP/1.1 (SSE Connection) --> EnvoyProxy
    EnvoyProxy -- HTTP/1.1 --> SSEService
    SSEService -- "data: {...}\n\n" --> EnvoyProxy
    EnvoyProxy -- "data: {...}\n\n" --> Client
    EnvoyProxy -- "生成结构化 Access Log (JSON)" --> Fluentd
    Fluentd -- "解析 & 转发" --> LogBackend

这个架构的核心在于将连接管理和业务逻辑分离,同时利用 Envoy 强大的日志能力,将可观测性作为一等公民来设计。Envoy 不仅仅是流量转发,更是我们观测数据的核心生产点。

Monorepo 项目结构搭建

在真实项目中,代码的组织方式直接影响开发和维护效率。采用 pnpm workspace 管理的 Monorepo 是最自然的选择,它能统一管理依赖,方便跨包引用和脚本执行。

我们的项目结构如下:

/our-monorepo
├── pnpm-workspace.yaml
├── package.json
├── packages/
│   ├── sse-service/      # 后端 SSE 事件推送服务 (Node.js)
│   │   ├── src/
│   │   ├── package.json
│   │   └── ...
│   └── ui-dashboard/     # 前端 SSG 应用,用于展示构建状态
│       ├── pages/
│       ├── package.json
│       └── ...
└── infra/
    ├── docker-compose.yml # 编排所有服务
    ├── envoy/
    │   └── envoy.yaml     # Envoy 代理配置
    └── fluentd/
        ├── fluent.conf    # Fluentd 配置
        └── Dockerfile     # Fluentd Docker 镜像构建文件

pnpm-workspace.yaml 文件内容很简单,用于声明工作区的范围:

packages:
  - 'packages/*'

这种结构使得前端、后端和基础设施配置(IaC)能够在一个代码库中协同演进,版本控制清晰,职责分离明确。

后端 SSE 服务实现 (sse-service)

SSE 服务是整个系统的事件源。这里我们使用 Node.js 和 Express 构建一个健壮的服务。它必须处理长连接、心跳保活以及优雅关闭。

这里的坑在于,默认的 HTTP 服务器超时会过早地断开 SSE 连接。我们必须手动禁用超时,并实现心跳机制来防止中间网络设备(如防火墙、NAT)因空闲而切断连接。

packages/sse-service/src/index.js:

import express from 'express';
import cors from 'cors';
import crypto from 'crypto';

const app = express();
const PORT = 3001;

// 存储所有活跃的客户端连接
const clients = new Map();

// 允许跨域请求
app.use(cors());
app.use(express.json());

// SSE 核心端点
app.get('/events', (req, res) => {
    // 设置 SSE 响应头
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // 立即发送头信息

    const clientId = crypto.randomUUID();
    const newClient = {
        id: clientId,
        res: res,
    };
    clients.set(clientId, newClient);
    console.log(`Client connected: ${clientId}. Total clients: ${clients.size}`);

    // 发送一条欢迎消息
    sendMessage(clientId, { type: 'welcome', message: `Client ${clientId} connected.` });

    // 实现心跳机制,防止连接因空闲超时而断开
    const keepAliveInterval = setInterval(() => {
        // SSE 注释行,用于保活
        res.write(': keep-alive\n\n');
    }, 15000); // 15秒一次

    req.on('close', () => {
        clearInterval(keepAliveInterval);
        clients.delete(clientId);
        console.log(`Client disconnected: ${clientId}. Total clients: ${clients.size}`);
    });
});

// 一个用于外部触发事件的模拟端点
app.post('/notify', (req, res) => {
    const { buildId, status, message } = req.body;
    if (!buildId || !status) {
        return res.status(400).json({ error: 'buildId and status are required' });
    }

    console.log(`Broadcasting build update: ${buildId} - ${status}`);
    const eventPayload = { buildId, status, message, timestamp: new Date().toISOString() };
    broadcastMessage(eventPayload);
    
    res.status(202).json({ message: 'Notification sent' });
});

function sendMessage(clientId, data) {
    const client = clients.get(clientId);
    if (client) {
        client.res.write(`id: ${crypto.randomUUID()}\n`);
        client.res.write(`data: ${JSON.stringify(data)}\n\n`);
    }
}

function broadcastMessage(data) {
    for (const [clientId, client] of clients.entries()) {
        client.res.write(`id: ${crypto.randomUUID()}\n`);
        client.res.write(`event: build_update\n`); // 自定义事件类型
        client.res.write(`data: ${JSON.stringify(data)}\n\n`);
    }
}

const server = app.listen(PORT, () => {
    console.log(`SSE service listening on port ${PORT}`);
});

// 优雅关闭
process.on('SIGTERM', () => {
    console.log('SIGTERM signal received: closing HTTP server');
    server.close(() => {
        console.log('HTTP server closed');
        // 通知所有客户端服务器将关闭
        for (const [clientId, client] of clients.entries()) {
             client.res.end();
        }
    });
});

这个服务不仅实现了 SSE 的基本逻辑,还包括了客户端管理、心跳、自定义事件类型和优雅停机。这是生产级代码的基石。

Envoy Proxy 配置 (envoy.yaml)

这是整个架构的灵魂。Envoy 将作为流量入口,负责将 /events 的请求路由到 SSE 服务,并为每一条请求(包括长连接的建立和断开)生成详细的、结构化的 JSON 日志。

infra/envoy/envoy.yaml:

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address: { address: 0.0.0.0, port_value: 8080 }
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          
          # 核心:配置结构化JSON访问日志
          access_log:
          - name: envoy.access_loggers.stdout
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
              log_format:
                json_format:
                  # 定义日志中需要包含的字段
                  # 这些字段对于追踪和调试至关重要
                  request_id: "%REQ(X-REQUEST-ID)%"
                  authority: "%REQ(:AUTHORITY)%"
                  path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%"
                  method: "%REQ(:METHOD)%"
                  protocol: "%PROTOCOL%"
                  response_code: "%RESPONSE_CODE%"
                  response_flags: "%RESPONSE_FLAGS%"
                  bytes_received: "%BYTES_RECEIVED%"
                  bytes_sent: "%BYTES_SENT%"
                  duration_ms: "%DURATION%"
                  downstream_remote_address: "%DOWNSTREAM_REMOTE_ADDRESS%"
                  upstream_host: "%UPSTREAM_HOST%"
                  user_agent: "%REQ(USER-AGENT)%"
                  start_time: "%START_TIME(%Y-%m-%dT%H:%M:%S%z)%"
          
          # 生成 x-request-id,用于全链路追踪
          request_id_extension:
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.request_id.uuid.v3.UuidRequestIdConfig
          
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match: { prefix: "/events" }
                route:
                  # 对于SSE长连接,需要设置一个较长或无限的超时时间
                  # 这里的坑在于,如果不设置,默认的15秒超时会强制断开连接
                  timeout: 0s 
                  idle_timeout: 0s # 禁用空闲超时
                  cluster: sse_service_cluster
              - match: { prefix: "/" } # 其他普通请求
                route:
                  cluster: sse_service_cluster
                  timeout: 5s # 普通请求使用默认超时
              cors:
                allow_origin_string_match:
                  - prefix: "*"
                allow_methods: GET, POST, OPTIONS
                allow_headers: "*"

          http_filters:
          - name: envoy.filters.http.cors
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
  
  clusters:
  - name: sse_service_cluster
    connect_timeout: 1s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: sse_service_cluster
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                # 使用 Docker Compose 的服务名进行服务发现
                address: sse-service
                port_value: 3001

这份配置的关键点:

  1. access_log: 使用 json_format 定义了详细的日志结构。%RESPONSE_FLAGS% 尤其重要,它可以告诉我们连接是正常关闭还是被上游/下游异常终止(例如 DC - Downstream Connection termination)。
  2. request_id_extension: 自动为每条请求注入 x-request-id,这是实现分布式追踪的第一步。
  3. timeoutidle_timeout: 为 /events 路由显式设置为 0s(永不超时),确保了 SSE 长连接的稳定性。这是一个常见的错误来源,很多开发者会忘记调整默认的短超时。

Fluentd 日志处理 (fluent.conf)

Fluentd 的角色是接收 Envoy 输出的 JSON 日志,进行解析、可能的丰富化处理,然后发送到最终的存储后端。在这个例子中,我们只将其格式化后打印到标准输出,但在真实项目中,输出目标会是 Elasticsearch、Splunk 或其他日志分析平台。

infra/fluentd/Dockerfile:

FROM fluent/fluentd:v1.16-1
USER root
RUN gem install fluent-plugin-json-in-json
USER fluent

我们需要一个额外的插件 fluent-plugin-json-in-json 来解析嵌在 JSON 字符串中的 JSON。

infra/fluentd/fluent.conf:

# Source: 监听来自 Envoy 的日志
# 这里使用 forward 协议,Envoy 的 logging driver 将会把日志推送到这里
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# Filter: 解析 Docker logging driver 包裹的 JSON
# Docker 发送的日志格式是 {"log":"...", "source":"stdout", ...}
# 我们需要提取 log 字段的内容
<filter **>
  @type parser
  key_name log
  reserve_data true # 保留原始字段
  <parse>
    @type json
  </parse>
</filter>

# Filter: 添加一些元数据,便于后续查询
# 例如,添加服务来源
<filter **>
  @type record_transformer
  <record>
    service_name "envoy-proxy"
    hostname "#{Socket.gethostname}"
  </record>
</filter>

# Output: 将处理后的日志打印到控制台,以便调试
# 在生产环境中,这里会替换为 elasticsearch, kafka 或 s3 插件
<match **>
  @type stdout
</match>

这个配置展示了一个简单的日志处理流水线:接收数据 -> 解析嵌套 JSON -> 丰富数据 -> 输出。Fluentd 强大的插件生态系统使得对接任何后端都变得非常容易。

前端 SSG 应用 (ui-dashboard)

前端应用使用 EventSource API 来订阅 SSE 事件。它的实现非常简单直接,并且内置了自动重连机制,比 WebSocket 的手动实现要省心。

假设我们有一个 Next.js 或 Astro 的页面组件 BuildStatus.jsx:

import React, { useState, useEffect } from 'react';

const BuildStatus = () => {
  const [updates, setUpdates] = useState([]);
  const [isConnected, setIsConnected] = useState(false);

  useEffect(() => {
    // Envoy 的地址
    const eventSource = new EventSource('http://localhost:8080/events');
    
    eventSource.onopen = () => {
      console.log('SSE connection opened.');
      setIsConnected(true);
    };

    // 监听自定义的 'build_update' 事件
    eventSource.addEventListener('build_update', (event) => {
      const newUpdate = JSON.parse(event.data);
      console.log('Received build update:', newUpdate);
      setUpdates(prevUpdates => [newUpdate, ...prevUpdates]);
    });

    eventSource.onerror = (err) => {
      console.error('EventSource failed:', err);
      setIsConnected(false);
      // EventSource 会自动尝试重连,无需手动干预
    };

    // 组件卸载时关闭连接
    return () => {
      console.log('Closing SSE connection.');
      eventSource.close();
    };
  }, []); // 空依赖数组确保 effect 只运行一次

  return (
    <div>
      <h1>Build Status Dashboard</h1>
      <p>Connection Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
      <ul>
        {updates.map((update, index) => (
          <li key={index}>
            <strong>Build {update.buildId}</strong>: {update.status} - {update.message}
            <small> @ {update.timestamp}</small>
          </li>
        ))}
      </ul>
    </div>
  );
};

export default BuildStatus;

整合与运行 (docker-compose.yml)

最后,我们用 Docker Compose 将所有部分串联起来,形成一个完整的、可运行的环境。

infra/docker-compose.yml:

version: '3.8'

services:
  sse-service:
    build:
      context: ../packages/sse-service
    container_name: sse-service
    ports:
      - "3001:3001"
    networks:
      - app-net

  envoy:
    image: envoyproxy/envoy:v1.27.0
    container_name: envoy
    volumes:
      - ./envoy/envoy.yaml:/etc/envoy/envoy.yaml
    ports:
      - "8080:8080" # 客户端访问入口
      - "9901:9901" # Envoy 管理界面
    networks:
      - app-net
    depends_on:
      - sse-service
    # 核心:将 Envoy 的 stdout 日志通过 fluentd driver 发送出去
    logging:
      driver: "fluentd"
      options:
        fluentd-address: "localhost:24224"
        tag: "envoy.access"

  fluentd:
    build:
      context: ./fluentd
    container_name: fluentd
    volumes:
      - ./fluentd/fluent.conf:/fluentd/etc/fluent.conf
    ports:
      - "24224:24224"
      - "24224:24224/udp"
    networks:
      - app-net

networks:
  app-net:
    driver: bridge

通过 logging.driver: "fluentd",我们指示 Docker 将 envoy 容器的标准输出直接转发给 fluentd 服务,完成了日志管道的闭环。

启动整个栈 (docker-compose up),然后访问前端页面,并从另一个终端向 SSE 服务发送通知 (curl -X POST -H "Content-Type: application/json" -d '{"buildId": "abc-123", "status": "RUNNING", "message": "Compiling assets..."}' http://localhost:8080/notify),你将看到页面实时更新。同时,在 Fluentd 容器的日志中,会看到格式化后的、包含 x-request-id 的访问日志,每一条连接的建立和数据交互都尽在掌握。

当前方案的局限性

尽管此架构解决了可观测性和实时性的核心问题,但它并非银弹。当前的 SSE 服务是单点的,无法水平扩展。如果需要支持大量并发连接,必须引入一个共享的后端,例如 Redis Pub/Sub,让所有 SSE 服务实例都能从中订阅事件并广播给各自连接的客户端。此外,日志最终需要持久化到专用的存储和分析系统(如 ELK Stack)中,并建立相应的仪表盘和告警,才能真正发挥其价值。最后,对于极端高扇出的场景,Envoy 本身的资源消耗也需要被纳入容量规划的考量之中。


  目录