我们团队维护着一个庞大的 Monorepo,其中包含数十个基于 SSG(静态站点生成)构建的前端应用。CI/CD 流水线在高峰期会变得拥挤,构建过程漫长且状态不透明。开发者无法实时得知构建进度,只能反复刷新页面,体验极差。为了解决这个问题,我们需要一个实时、可靠且具备高可观测性的构建状态通知系统。
最初的构想很简单:一个后端服务通过 WebSocket 推送状态,前端页面接收。但随着深入思考,这个方案在可观测性和运维复杂度上暴露了问题。我们需要追踪每一条连接的生命周期,记录关键事件,并能对这些数据进行聚合分析。最终,技术选型锁定在了一套组合拳上:Server-Sent Events (SSE)
用于单向通信,Envoy Proxy
作为边缘网关处理连接并生成结构化日志,Fluentd
负责日志的收集与转发,而整个项目——包括 SSG 前端、SSE 服务和基础设施配置——都将在一个 Monorepo
中进行统一管理。
graph TD subgraph "客户端" Client[浏览器 EventSource API] end subgraph "边缘代理 (Envoy)" EnvoyProxy[Envoy Proxy] end subgraph "后端服务" SSEService[Node.js SSE 服务] end subgraph "日志管道" Fluentd[Fluentd] LogBackend[(日志后端 / ELK Stack)] end Client -- HTTP/1.1 (SSE Connection) --> EnvoyProxy EnvoyProxy -- HTTP/1.1 --> SSEService SSEService -- "data: {...}\n\n" --> EnvoyProxy EnvoyProxy -- "data: {...}\n\n" --> Client EnvoyProxy -- "生成结构化 Access Log (JSON)" --> Fluentd Fluentd -- "解析 & 转发" --> LogBackend
这个架构的核心在于将连接管理和业务逻辑分离,同时利用 Envoy 强大的日志能力,将可观测性作为一等公民来设计。Envoy 不仅仅是流量转发,更是我们观测数据的核心生产点。
Monorepo 项目结构搭建
在真实项目中,代码的组织方式直接影响开发和维护效率。采用 pnpm workspace 管理的 Monorepo 是最自然的选择,它能统一管理依赖,方便跨包引用和脚本执行。
我们的项目结构如下:
/our-monorepo
├── pnpm-workspace.yaml
├── package.json
├── packages/
│ ├── sse-service/ # 后端 SSE 事件推送服务 (Node.js)
│ │ ├── src/
│ │ ├── package.json
│ │ └── ...
│ └── ui-dashboard/ # 前端 SSG 应用,用于展示构建状态
│ ├── pages/
│ ├── package.json
│ └── ...
└── infra/
├── docker-compose.yml # 编排所有服务
├── envoy/
│ └── envoy.yaml # Envoy 代理配置
└── fluentd/
├── fluent.conf # Fluentd 配置
└── Dockerfile # Fluentd Docker 镜像构建文件
pnpm-workspace.yaml
文件内容很简单,用于声明工作区的范围:
packages:
- 'packages/*'
这种结构使得前端、后端和基础设施配置(IaC)能够在一个代码库中协同演进,版本控制清晰,职责分离明确。
后端 SSE 服务实现 (sse-service
)
SSE 服务是整个系统的事件源。这里我们使用 Node.js 和 Express 构建一个健壮的服务。它必须处理长连接、心跳保活以及优雅关闭。
这里的坑在于,默认的 HTTP 服务器超时会过早地断开 SSE 连接。我们必须手动禁用超时,并实现心跳机制来防止中间网络设备(如防火墙、NAT)因空闲而切断连接。
packages/sse-service/src/index.js
:
import express from 'express';
import cors from 'cors';
import crypto from 'crypto';
const app = express();
const PORT = 3001;
// 存储所有活跃的客户端连接
const clients = new Map();
// 允许跨域请求
app.use(cors());
app.use(express.json());
// SSE 核心端点
app.get('/events', (req, res) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // 立即发送头信息
const clientId = crypto.randomUUID();
const newClient = {
id: clientId,
res: res,
};
clients.set(clientId, newClient);
console.log(`Client connected: ${clientId}. Total clients: ${clients.size}`);
// 发送一条欢迎消息
sendMessage(clientId, { type: 'welcome', message: `Client ${clientId} connected.` });
// 实现心跳机制,防止连接因空闲超时而断开
const keepAliveInterval = setInterval(() => {
// SSE 注释行,用于保活
res.write(': keep-alive\n\n');
}, 15000); // 15秒一次
req.on('close', () => {
clearInterval(keepAliveInterval);
clients.delete(clientId);
console.log(`Client disconnected: ${clientId}. Total clients: ${clients.size}`);
});
});
// 一个用于外部触发事件的模拟端点
app.post('/notify', (req, res) => {
const { buildId, status, message } = req.body;
if (!buildId || !status) {
return res.status(400).json({ error: 'buildId and status are required' });
}
console.log(`Broadcasting build update: ${buildId} - ${status}`);
const eventPayload = { buildId, status, message, timestamp: new Date().toISOString() };
broadcastMessage(eventPayload);
res.status(202).json({ message: 'Notification sent' });
});
function sendMessage(clientId, data) {
const client = clients.get(clientId);
if (client) {
client.res.write(`id: ${crypto.randomUUID()}\n`);
client.res.write(`data: ${JSON.stringify(data)}\n\n`);
}
}
function broadcastMessage(data) {
for (const [clientId, client] of clients.entries()) {
client.res.write(`id: ${crypto.randomUUID()}\n`);
client.res.write(`event: build_update\n`); // 自定义事件类型
client.res.write(`data: ${JSON.stringify(data)}\n\n`);
}
}
const server = app.listen(PORT, () => {
console.log(`SSE service listening on port ${PORT}`);
});
// 优雅关闭
process.on('SIGTERM', () => {
console.log('SIGTERM signal received: closing HTTP server');
server.close(() => {
console.log('HTTP server closed');
// 通知所有客户端服务器将关闭
for (const [clientId, client] of clients.entries()) {
client.res.end();
}
});
});
这个服务不仅实现了 SSE 的基本逻辑,还包括了客户端管理、心跳、自定义事件类型和优雅停机。这是生产级代码的基石。
Envoy Proxy 配置 (envoy.yaml
)
这是整个架构的灵魂。Envoy 将作为流量入口,负责将 /events
的请求路由到 SSE 服务,并为每一条请求(包括长连接的建立和断开)生成详细的、结构化的 JSON 日志。
infra/envoy/envoy.yaml
:
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 8080 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
# 核心:配置结构化JSON访问日志
access_log:
- name: envoy.access_loggers.stdout
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
log_format:
json_format:
# 定义日志中需要包含的字段
# 这些字段对于追踪和调试至关重要
request_id: "%REQ(X-REQUEST-ID)%"
authority: "%REQ(:AUTHORITY)%"
path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%"
method: "%REQ(:METHOD)%"
protocol: "%PROTOCOL%"
response_code: "%RESPONSE_CODE%"
response_flags: "%RESPONSE_FLAGS%"
bytes_received: "%BYTES_RECEIVED%"
bytes_sent: "%BYTES_SENT%"
duration_ms: "%DURATION%"
downstream_remote_address: "%DOWNSTREAM_REMOTE_ADDRESS%"
upstream_host: "%UPSTREAM_HOST%"
user_agent: "%REQ(USER-AGENT)%"
start_time: "%START_TIME(%Y-%m-%dT%H:%M:%S%z)%"
# 生成 x-request-id,用于全链路追踪
request_id_extension:
typed_config:
"@type": type.googleapis.com/envoy.extensions.request_id.uuid.v3.UuidRequestIdConfig
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/events" }
route:
# 对于SSE长连接,需要设置一个较长或无限的超时时间
# 这里的坑在于,如果不设置,默认的15秒超时会强制断开连接
timeout: 0s
idle_timeout: 0s # 禁用空闲超时
cluster: sse_service_cluster
- match: { prefix: "/" } # 其他普通请求
route:
cluster: sse_service_cluster
timeout: 5s # 普通请求使用默认超时
cors:
allow_origin_string_match:
- prefix: "*"
allow_methods: GET, POST, OPTIONS
allow_headers: "*"
http_filters:
- name: envoy.filters.http.cors
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: sse_service_cluster
connect_timeout: 1s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: sse_service_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
# 使用 Docker Compose 的服务名进行服务发现
address: sse-service
port_value: 3001
这份配置的关键点:
-
access_log
: 使用json_format
定义了详细的日志结构。%RESPONSE_FLAGS%
尤其重要,它可以告诉我们连接是正常关闭还是被上游/下游异常终止(例如DC
- Downstream Connection termination)。 -
request_id_extension
: 自动为每条请求注入x-request-id
,这是实现分布式追踪的第一步。 -
timeout
和idle_timeout
: 为/events
路由显式设置为0s
(永不超时),确保了 SSE 长连接的稳定性。这是一个常见的错误来源,很多开发者会忘记调整默认的短超时。
Fluentd 日志处理 (fluent.conf
)
Fluentd 的角色是接收 Envoy 输出的 JSON 日志,进行解析、可能的丰富化处理,然后发送到最终的存储后端。在这个例子中,我们只将其格式化后打印到标准输出,但在真实项目中,输出目标会是 Elasticsearch、Splunk 或其他日志分析平台。
infra/fluentd/Dockerfile
:
FROM fluent/fluentd:v1.16-1
USER root
RUN gem install fluent-plugin-json-in-json
USER fluent
我们需要一个额外的插件 fluent-plugin-json-in-json
来解析嵌在 JSON 字符串中的 JSON。
infra/fluentd/fluent.conf
:
# Source: 监听来自 Envoy 的日志
# 这里使用 forward 协议,Envoy 的 logging driver 将会把日志推送到这里
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# Filter: 解析 Docker logging driver 包裹的 JSON
# Docker 发送的日志格式是 {"log":"...", "source":"stdout", ...}
# 我们需要提取 log 字段的内容
<filter **>
@type parser
key_name log
reserve_data true # 保留原始字段
<parse>
@type json
</parse>
</filter>
# Filter: 添加一些元数据,便于后续查询
# 例如,添加服务来源
<filter **>
@type record_transformer
<record>
service_name "envoy-proxy"
hostname "#{Socket.gethostname}"
</record>
</filter>
# Output: 将处理后的日志打印到控制台,以便调试
# 在生产环境中,这里会替换为 elasticsearch, kafka 或 s3 插件
<match **>
@type stdout
</match>
这个配置展示了一个简单的日志处理流水线:接收数据 -> 解析嵌套 JSON -> 丰富数据 -> 输出。Fluentd 强大的插件生态系统使得对接任何后端都变得非常容易。
前端 SSG 应用 (ui-dashboard
)
前端应用使用 EventSource
API 来订阅 SSE 事件。它的实现非常简单直接,并且内置了自动重连机制,比 WebSocket 的手动实现要省心。
假设我们有一个 Next.js 或 Astro 的页面组件 BuildStatus.jsx
:
import React, { useState, useEffect } from 'react';
const BuildStatus = () => {
const [updates, setUpdates] = useState([]);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
// Envoy 的地址
const eventSource = new EventSource('http://localhost:8080/events');
eventSource.onopen = () => {
console.log('SSE connection opened.');
setIsConnected(true);
};
// 监听自定义的 'build_update' 事件
eventSource.addEventListener('build_update', (event) => {
const newUpdate = JSON.parse(event.data);
console.log('Received build update:', newUpdate);
setUpdates(prevUpdates => [newUpdate, ...prevUpdates]);
});
eventSource.onerror = (err) => {
console.error('EventSource failed:', err);
setIsConnected(false);
// EventSource 会自动尝试重连,无需手动干预
};
// 组件卸载时关闭连接
return () => {
console.log('Closing SSE connection.');
eventSource.close();
};
}, []); // 空依赖数组确保 effect 只运行一次
return (
<div>
<h1>Build Status Dashboard</h1>
<p>Connection Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
<ul>
{updates.map((update, index) => (
<li key={index}>
<strong>Build {update.buildId}</strong>: {update.status} - {update.message}
<small> @ {update.timestamp}</small>
</li>
))}
</ul>
</div>
);
};
export default BuildStatus;
整合与运行 (docker-compose.yml
)
最后,我们用 Docker Compose 将所有部分串联起来,形成一个完整的、可运行的环境。
infra/docker-compose.yml
:
version: '3.8'
services:
sse-service:
build:
context: ../packages/sse-service
container_name: sse-service
ports:
- "3001:3001"
networks:
- app-net
envoy:
image: envoyproxy/envoy:v1.27.0
container_name: envoy
volumes:
- ./envoy/envoy.yaml:/etc/envoy/envoy.yaml
ports:
- "8080:8080" # 客户端访问入口
- "9901:9901" # Envoy 管理界面
networks:
- app-net
depends_on:
- sse-service
# 核心:将 Envoy 的 stdout 日志通过 fluentd driver 发送出去
logging:
driver: "fluentd"
options:
fluentd-address: "localhost:24224"
tag: "envoy.access"
fluentd:
build:
context: ./fluentd
container_name: fluentd
volumes:
- ./fluentd/fluent.conf:/fluentd/etc/fluent.conf
ports:
- "24224:24224"
- "24224:24224/udp"
networks:
- app-net
networks:
app-net:
driver: bridge
通过 logging.driver: "fluentd"
,我们指示 Docker 将 envoy
容器的标准输出直接转发给 fluentd
服务,完成了日志管道的闭环。
启动整个栈 (docker-compose up
),然后访问前端页面,并从另一个终端向 SSE 服务发送通知 (curl -X POST -H "Content-Type: application/json" -d '{"buildId": "abc-123", "status": "RUNNING", "message": "Compiling assets..."}' http://localhost:8080/notify
),你将看到页面实时更新。同时,在 Fluentd 容器的日志中,会看到格式化后的、包含 x-request-id
的访问日志,每一条连接的建立和数据交互都尽在掌握。
当前方案的局限性
尽管此架构解决了可观测性和实时性的核心问题,但它并非银弹。当前的 SSE 服务是单点的,无法水平扩展。如果需要支持大量并发连接,必须引入一个共享的后端,例如 Redis Pub/Sub,让所有 SSE 服务实例都能从中订阅事件并广播给各自连接的客户端。此外,日志最终需要持久化到专用的存储和分析系统(如 ELK Stack)中,并建立相应的仪表盘和告警,才能真正发挥其价值。最后,对于极端高扇出的场景,Envoy 本身的资源消耗也需要被纳入容量规划的考量之中。