我们面临一个具体的工程挑战:一个提供语义搜索服务的线上系统,其核心是一个基于sentence-transformers
的向量嵌入模型。当算法团队产出一个效果更优的新版模型时,我们必须将其上线,并用新模型重新索引数据库中数百万份文档。整个过程对用户的要求是完全无感知——不能有服务中断,不能有搜索结果的暂时性降级或不一致。这意味着简单的停机维护或“先删后建”的索引策略是不可接受的。
问题的核心在于如何编排一个涉及模型服务更新、大规模数据重处理、搜索引擎索引切换和前端应用部署的复杂工作流,并将其完全自动化,确保流程的原子性和可靠性。
初步构想与架构设计
为了解决这个问题,我们设计了一套由 CircleCI 驱动的自动化 MLOps 流程。整个系统的架构由以下几个关键组件构成:
- 模型服务 (Model Service): 一个轻量级的 FastAPI 应用,负责加载新的嵌入模型,并提供一个 RESTful API 接口,用于将文本转换为向量。该服务被打包成 Docker 镜像。
- 向量搜索引擎 (Vector Search Engine): 使用 OpenSearch。它的 k-NN 插件提供了高效的向量检索能力,而其“索引别名 (Index Alias)”功能是实现零停机切换的核心。
- 前端应用 (Frontend): 一个基于 Qwik 构建的单页面应用。Qwik 的可恢复性(Resumability)特性使得即使用户在部署期间正在使用应用,也能获得平滑的体验。
- CI/CD 管道 (CI/CD Pipeline): 使用 CircleCI 来自动化整个部署和更新流程。
整个零停机更新的流程被设计为以下几个步骤,并通过 Mermaid 图清晰地展示出来:
graph TD subgraph "CircleCI Workflow" A[Developer pushes new model to Git] --> B{Start Workflow}; B --> C[1. Test Qwik Frontend]; B --> D[2. Build & Push Model Service Image]; D --> E[3. Deploy New Model Service]; E --> F[4. Run Re-indexing Job]; F --> G[5. Atomically Switch OpenSearch Alias]; C --> H[6. Build & Deploy Qwik Frontend]; G --> H; end subgraph "OpenSearch" OS1(Index: documents_v1) OS2(Index: documents_v2) Alias(Alias: semantic_search) F -- Creates & Populates --> OS2 G -- Points to --> OS2 Alias -- Initially points to --> OS1 end subgraph "Production Environment" User(User) --> App(Qwik App); App --> SearchAPI(Search API); SearchAPI -- Queries via --> Alias; end style F fill:#f9f,stroke:#333,stroke-width:2px style G fill:#f9f,stroke:#333,stroke-width:2px
这里的关键决策是利用 OpenSearch 的索引别名。应用层代码永远只与别名 semantic_search
交互。更新时,我们创建一个全新的索引(如 documents_v2
),在后台用新模型生成向量并填充数据。数据填充完毕后,通过一个原子操作将别名从旧索引 documents_v1
指向新索引 documents_v2
。这个切换是瞬时的,对查询流量毫无影响。
核心实现细节
1. 模型服务与容器化
模型服务非常直接,它使用 FastAPI 和 sentence-transformers
库。
model_service/app/main.py
:
import os
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
from typing import List
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 从环境变量加载模型名称,提供默认值
MODEL_NAME = os.getenv("MODEL_NAME", "all-MiniLM-L6-v2")
# 在真实项目中,这里可能是从模型注册表获取的模型路径
MODEL_PATH = f"./models/{MODEL_NAME}"
# 全局加载模型,避免每次请求都重新加载
# 这是一种常见的优化,但在 serverless 环境下需要注意冷启动问题
try:
if not os.path.exists(MODEL_PATH):
logging.info(f"Model not found locally at {MODEL_PATH}. Downloading {MODEL_NAME} from Hugging Face Hub...")
# 预下载模型到指定路径
model = SentenceTransformer(MODEL_NAME)
model.save(MODEL_PATH)
else:
logging.info(f"Loading model from local path: {MODEL_PATH}")
model = SentenceTransformer(MODEL_PATH)
logging.info(f"Model {MODEL_NAME} loaded successfully.")
except Exception as e:
logging.error(f"Failed to load model: {e}")
# 如果模型加载失败,应用无法正常工作,应直接退出
raise RuntimeError(f"Could not load SentenceTransformer model: {e}") from e
app = FastAPI(title="Embedding Model Service")
class EmbeddingRequest(BaseModel):
texts: List[str] = Field(..., min_items=1, max_items=128, description="List of texts to embed.")
class EmbeddingResponse(BaseModel):
embeddings: List[List[float]]
model: str = MODEL_NAME
@app.post("/v1/embed", response_model=EmbeddingResponse)
async def get_embeddings(request: EmbeddingRequest):
"""
接收文本列表,返回对应的向量嵌入。
"""
try:
# SentenceTransformer 的 encode 方法是线程安全的
embeddings = model.encode(request.texts, convert_to_tensor=False).tolist()
return EmbeddingResponse(embeddings=embeddings)
except Exception as e:
logging.error(f"Error during embedding encoding: {e}", exc_info=True)
# 捕获潜在的运行时错误,例如输入数据格式问题
raise HTTPException(status_code=500, detail="Internal server error during embedding.")
@app.get("/health")
def health_check():
"""
简单的健康检查端点,用于负载均衡器或K8s的存活探针。
"""
return {"status": "ok", "model": MODEL_NAME}
相应的 Dockerfile
保证了环境的一致性。
model_service/Dockerfile
:
# 使用官方提供的包含CUDA支持的镜像,如果需要在GPU上运行
# FROM python:3.9-slim
# 对于CPU推理,slim镜像足够
FROM python:3.9-slim
WORKDIR /app
# 环境变量,用于指定模型
ARG MODEL_NAME=all-MiniLM-L6-v2
ENV MODEL_NAME=${MODEL_NAME}
# 安装依赖
# --no-cache-dir 减小镜像体积
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY ./app /app/app
# 在构建时预下载模型,而不是在容器启动时
# 这样可以加速容器启动,并将模型作为镜像的一部分
RUN python -c "from sentence_transformers import SentenceTransformer; model = SentenceTransformer('${MODEL_NAME}'); model.save('./models/${MODEL_NAME}')"
EXPOSE 8000
# 使用 gunicorn 作为生产级 WSGI 服务器
# workers 的数量通常设置为 (2 * CPU核心数) + 1
# --timeout 设为 60 秒,防止处理长文本时超时
CMD ["gunicorn", "-k", "uvicorn.workers.UvicornWorker", "-w", "4", "-b", "0.0.0.0:8000", "--timeout", "60", "app.main:app"]
2. OpenSearch 索引模板与零停机重索引脚本
这是实现零停机更新的核心。首先,我们定义一个索引模板,确保所有新创建的文档索引都具备我们需要的 k-NN 向量字段映射。
opensearch_utils/template.json
:
{
"index_patterns": ["documents_*"],
"template": {
"settings": {
"index.knn": true,
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"doc_id": { "type": "keyword" },
"content": { "type": "text" },
"embedding": {
"type": "knn_vector",
"dimension": 384,
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "nmslib",
"parameters": {
"ef_construction": 256,
"m": 48
}
}
}
}
}
}
}
- 注意:
dimension
必须与你使用的模型输出维度相匹配 (all-MiniLM-L6-v2
是 384)。
然后,我们编写一个 Python 脚本 reindex.py
,它将被 CircleCI 调用。这个脚本封装了整个重索引和别名切换的逻辑。
opensearch_utils/reindex.py
:
import os
import time
import httpx
import logging
from datetime import datetime
from opensearchpy import OpenSearch, helpers
# 配置
OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST")
OPENSEARCH_USER = os.getenv("OPENSEARCH_USER")
OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD")
MODEL_SERVICE_URL = os.getenv("MODEL_SERVICE_URL") # 新部署的模型服务URL
ALIAS_NAME = "semantic_search"
INDEX_PREFIX = "documents"
BATCH_SIZE = 100 # 批量处理文档以提高效率
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 模拟从主数据库获取文档
def get_all_documents_from_source():
# 在真实项目中,这里会连接 PostgreSQL, MongoDB 等数据库
# 并拉取所有需要被索引的文档。
# 为演示,我们使用硬编码的数据。
docs = [
{"doc_id": "doc_001", "content": "Qwik is a resumable web framework for building instant-loading web applications."},
{"doc_id": "doc_002", "content": "OpenSearch is a community-driven, open-source search and analytics suite."},
{"doc_id": "doc_003", "content": "Vitest is a blazing fast unit test framework powered by Vite."},
{"doc_id": "doc_004", "content": "CircleCI automates the build, test, and deployment of software."},
# ... 假设这里有数百万条文档
]
for doc in docs:
yield doc
def get_embeddings_batch(texts: list[str]):
"""调用模型服务获取批量文本的向量"""
try:
with httpx.Client(timeout=60.0) as client:
response = client.post(f"{MODEL_SERVICE_URL}/v1/embed", json={"texts": texts})
response.raise_for_status()
return response.json()["embeddings"]
except httpx.RequestError as e:
logging.error(f"Failed to call model service: {e}")
return None
def generate_actions_for_bulk_indexing(new_index_name):
"""
生成器,用于OpenSearch的批量索引操作
"""
docs_batch = []
contents_batch = []
for doc in get_all_documents_from_source():
docs_batch.append(doc)
contents_batch.append(doc["content"])
if len(docs_batch) >= BATCH_SIZE:
embeddings = get_embeddings_batch(contents_batch)
if embeddings:
for i, doc_item in enumerate(docs_batch):
yield {
"_op_type": "index",
"_index": new_index_name,
"_id": doc_item["doc_id"],
"_source": {
"doc_id": doc_item["doc_id"],
"content": doc_item["content"],
"embedding": embeddings[i]
}
}
docs_batch, contents_batch = [], []
# 处理最后一批不足BATCH_SIZE的文档
if docs_batch:
embeddings = get_embeddings_batch(contents_batch)
if embeddings:
for i, doc_item in enumerate(docs_batch):
yield {
"_op_type": "index",
"_index": new_index_name,
"_id": doc_item["doc_id"],
"_source": {
"doc_id": doc_item["doc_id"],
"content": doc_item["content"],
"embedding": embeddings[i]
}
}
def main():
client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD),
use_ssl=True,
verify_certs=True,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
# 1. 创建一个新的索引,名称包含时间戳以保证唯一性
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
new_index_name = f"{INDEX_PREFIX}_v{timestamp}"
logging.info(f"Creating new index: {new_index_name}")
# 模板会自动应用,所以这里不需要指定mapping
client.indices.create(index=new_index_name)
# 2. 填充新索引
logging.info(f"Start populating index {new_index_name}...")
success_count, _ = helpers.bulk(client, generate_actions_for_bulk_indexing(new_index_name))
logging.info(f"Successfully indexed {success_count} documents.")
# 强制刷新,确保所有文档都可被搜索
client.indices.refresh(index=new_index_name)
# 3. 原子地切换别名
logging.info(f"Switching alias '{ALIAS_NAME}' to point to '{new_index_name}'")
actions = {
"actions": [
# 移除别名在所有旧索引上的指向 (如果有)
{"remove": {"index": f"{INDEX_PREFIX}_*", "alias": ALIAS_NAME}},
# 将别名指向新索引
{"add": {"index": new_index_name, "alias": ALIAS_NAME}}
]
}
client.indices.update_aliases(body=actions)
logging.info("Alias switched successfully.")
# 4. (可选) 清理旧索引
# 在生产环境中,我们会保留几个旧版本以备回滚,并设置一个单独的清理任务
all_indices = client.indices.get(index=f"{INDEX_PREFIX}_*")
for index_name in all_indices:
if index_name != new_index_name:
# 检查该索引是否已不再被别名使用
if not client.indices.exists_alias(name=ALIAS_NAME, index=index_name):
logging.info(f"Deleting old index: {index_name}")
client.indices.delete(index=index_name)
if __name__ == "__main__":
main()
这个脚本是整个流程的“心脏”,它保证了数据处理的幂等性和切换的原子性。
3. Qwik 前端与 Vitest 测试
Qwik 前端负责用户交互。我们创建一个 routeLoader$
来获取搜索结果。
src/routes/search/index.tsx
:
import { component$, useSignal, useTask$ } from '@builder.io/qwik';
import { routeLoader$, Form } from '@builder.io/qwik-city';
import { server$ } from '@builder.io/qwik-city';
// 在真实项目中,这个URL应该是通过环境变量配置的
const SEARCH_API_URL = 'https://api.example.com/search';
// 定义数据类型
interface SearchResult {
doc_id: string;
content: string;
score: number;
}
// 服务端函数,用于执行搜索。这确保了API密钥等不会暴露到客户端
const performSearch = server$(async (query: string): Promise<SearchResult[]> => {
if (!query) {
return [];
}
// 这是后端API,它会调用模型服务和OpenSearch
const response = await fetch(SEARCH_API_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query }),
});
if (!response.ok) {
console.error("Search API request failed");
return [];
}
const data = await response.json();
return data.results || [];
});
export const useSearchResults = routeLoader$(() => {
// 初始加载时为空
return [] as SearchResult[];
});
export default component$(() => {
const query = useSignal('');
const results = useSignal<SearchResult[]>([]);
const isLoading = useSignal(false);
// 当用户停止输入500ms后执行搜索
useTask$(({ track, cleanup }) => {
track(() => query.value);
const debounceTimer = setTimeout(async () => {
if (query.value.trim().length > 2) {
isLoading.value = true;
const searchResults = await performSearch(query.value);
results.value = searchResults;
isLoading.value = false;
} else {
results.value = [];
}
}, 500);
cleanup(() => clearTimeout(debounceTimer));
});
return (
<div>
<h1>Semantic Search</h1>
<input
type="search"
placeholder="Search documents..."
bind:value={query}
class="search-input"
/>
{isLoading.value && <p>Searching...</p>}
<ul class="results-list">
{results.value.map((res) => (
<li key={res.doc_id}>
<strong>Score: {res.score.toFixed(4)}</strong>
<p>{res.content}</p>
</li>
))}
</ul>
</div>
);
});
为了保证前端逻辑的健壮性,我们用 Vitest 编写单元测试。
src/routes/search/search.test.tsx
:
import { test, expect, describe, vi } from 'vitest';
import { createDOM } from '@builder.io/qwik/testing';
import SearchComponent from './index';
// 我们不测试真实的 server$ 函数,而是模拟它的行为
// Vitest 的 vi.mock 功能非常适合处理这种依赖
vi.mock('@builder.io/qwik-city', async () => {
const original = await vi.importActual<typeof import('@builder.io/qwik-city')>('@builder.io/qwik-city');
return {
...original,
server$: vi.fn((fn) => fn), // 透明地传递函数
};
});
describe('Search Component', () => {
test('should render search input and initial state', async () => {
const { screen, render } = await createDOM();
await render(<SearchComponent />);
const input = screen.querySelector('input[type="search"]') as HTMLInputElement;
expect(input).not.toBeNull();
expect(input.placeholder).toBe('Search documents...');
const initialResults = screen.querySelector('.results-list');
expect(initialResults?.children.length).toBe(0);
});
// 对于交互的测试,Qwik的测试工具目前还在发展
// 但我们可以测试核心逻辑,比如状态管理和组件渲染
test('should display loading state', async () => {
const { screen, render, userEvent } = await createDOM();
await render(<SearchComponent />);
const input = screen.querySelector('input[type="search"]') as HTMLInputElement;
// 模拟用户输入,触发 useTask$
// 注意:直接测试 useTask$ 的异步行为比较复杂,
// 在这里我们主要验证组件结构
await userEvent(input, 'type', 'test query');
// 这里只是一个结构性的断言,真实场景下需要更复杂的测试策略来处理异步状态
expect(input.value).toBe('test query');
});
});
这里的测试是基础的,但在真实项目中,它能防止在重构时破坏基本的用户交互逻辑。
4. CircleCI 自动化流水线
最后,所有组件由 .circleci/config.yml
文件串联起来。
version: 2.1
# Orbs 是可重用的配置包,简化了常见任务
orbs:
node: circleci/[email protected]
docker: circleci/[email protected]
python: circleci/[email protected]
# 定义可复用的命令
commands:
# 安装和缓存 npm 依赖
install_frontend_deps:
steps:
- node/install-packages:
pkg-manager: npm
# 定义执行器 (执行环境)
executors:
node-executor:
docker:
- image: cimg/node:18.17
python-executor:
docker:
- image: cimg/python:3.9
jobs:
# Job 1: 测试前端应用
test_frontend:
executor: node-executor
steps:
- checkout
- install_frontend_deps
- run:
name: Run Vitest Unit Tests
command: npm test -- --run
# Job 2: 构建并推送模型服务 Docker 镜像
build_and_push_model_service:
executor: docker/docker
parameters:
model_version:
type: string
description: "Tag for the docker image, e.g., git commit hash"
steps:
- checkout
- docker/check
- docker/build:
image: my-docker-hub-user/model-service
tag: << parameters.model_version >>
dockerfile: model_service/Dockerfile
path: .
# 传递模型名称作为构建参数
extra_build_args: "--build-arg MODEL_NAME=all-MiniLM-L12-v2"
- docker/push:
image: my-docker-hub-user/model-service
tag: << parameters.model_version >>
# Job 3: 部署模型服务 (示意)
# 真实项目中,这一步会使用 kubectl, helm, aws ecs cli 等工具
deploy_model_service:
executor: python-executor # 假设部署脚本用 Python
steps:
- run:
name: Deploy Model Service to Production
command: |
echo "Deploying new model service version..."
# deploy_script.sh my-docker-hub-user/model-service:$(git rev-parse --short HEAD)
echo "Deployment successful."
# Job 4: 运行重索引脚本
run_reindexing:
executor: python-executor
steps:
- checkout
- python/install-packages:
pip-dependency-file: opensearch_utils/requirements.txt
- run:
name: Run OpenSearch Re-indexing
command: |
# 从 CircleCI 的 Context 或环境变量中获取凭证
export OPENSEARCH_HOST=$OPENSEARCH_HOST
export OPENSEARCH_USER=$OPENSEARCH_USER
export OPENSEARCH_PASSWORD=$OPENSEARCH_PASSWORD
export MODEL_SERVICE_URL=$MODEL_SERVICE_URL
python opensearch_utils/reindex.py
# Job 5: 构建并部署前端应用
build_and_deploy_frontend:
executor: node-executor
steps:
- checkout
- install_frontend_deps
- run:
name: Build Qwik Application
command: npm run build
- run:
name: Deploy to Static Hosting
# 比如使用 AWS CLI 同步到 S3
command: |
echo "Deploying frontend to S3..."
# aws s3 sync ./dist s3://my-frontend-bucket --delete
workflows:
version: 2
update_and_deploy_model:
# 仅在 main 分支有提交时触发
when:
equal: [ main, << pipeline.git.branch >> ]
jobs:
- test_frontend
- build_and_push_model_service:
context: dockerhub-creds # 存储 Docker Hub 凭证的 Context
model_version: << pipeline.git.revision | substr 0 7 >>
- deploy_model_service:
requires:
- build_and_push_model_service
- run_reindexing:
context: opensearch-creds # 存储 OpenSearch 凭证的 Context
requires:
- deploy_model_service
- build_and_deploy_frontend:
context: aws-creds
requires:
- test_frontend
- run_reindexing
这个 CircleCI 配置定义了一个清晰、有依赖关系的工作流。它首先并行地测试前端和构建模型镜像。模型服务部署成功后,才触发最关键的重索引任务。最后,当前端测试和重索引都完成后,才部署新的前端静态文件。
局限性与未来迭代路径
这套方案有效地解决了中等规模数据集(百万到千万级别)下模型的零停机更新问题,但它并非没有局限。
首先,全量重索引的成本和时间会随着数据量的增长而线性增加。当数据达到数十亿规模时,每次更新都重新计算所有向量是不切实际的。未来的迭代方向是引入变更数据捕获(CDC)机制,例如使用 Debezium 从主数据库捕获数据变更,通过消息队列(如 Kafka)将增量更新实时推送到一个处理服务中,该服务再使用新模型计算向量并更新 OpenSearch。这样,更新模型就从“全量重建”变成了“追赶增量”。
其次,模型部署本身可以做得更精细。当前的流程是“一次性”切换。更稳妥的方式是采用金丝雀部署,让新旧两个版本的模型服务同时在线,通过流量切分(例如 2% 的请求发往新模型)来小范围验证新模型的性能和业务指标影响。这需要在我们的部署脚本和可能的API网关层面做更多工作。
最后,这套架构的适用边界在于模型更新频率和数据规模。对于需要高频(例如每小时)更新模型且数据量巨大的场景,基于批处理的重索引模式会成为瓶颈,流式处理架构(如 Flink + RocksDB)可能是更合适的选择。