Pulsar驱动的实时向量摄取架构在Vue与React Native前端的应用实现


定义一个棘手的技术问题:如何为一个跨平台应用(同时服务Vue.js Web端和React Native移动端)构建一个知识库系统,使用户提交的非结构化文本内容,能在数秒内通过语义相似度被检索到?这里的核心挑战在于,Embedding过程(将文本转化为向量)可能是一个耗时且计算密集的操作,我们绝不能让它阻塞前端UI,也不能因为它暂时的失败而丢失用户数据。

方案A:简单但脆弱的同步API架构

一个最直观的设计是采用同步API调用。当用户在前端提交一段文本时,客户端直接调用一个后端API。

sequenceDiagram
    participant FE as 前端 (Vue/RN)
    participant GW as API网关
    participant BE as 后端服务
    participant EM as Embedding模型
    participant VDB as ChromaDB

    FE->>+GW: POST /ingest (包含文本)
    GW->>+BE: process_ingest(text)
    BE->>+EM: generate_vector(text)
    Note right of BE: 网络调用,可能耗时100ms-2s
    EM-->>-BE: 返回向量
    BE->>+VDB: upsert(vector, metadata)
    Note right of BE: 数据库写入
    VDB-->>-BE: 写入成功
    BE-->>-GW: { "status": "ok" }
    GW-->>-FE: 响应成功

这种架构的优点是逻辑清晰,实现简单。前端发起请求,等待后端完成所有处理(生成向量、写入向量数据库)后返回结果。

但它的问题在生产环境中是致命的:

  1. 糟糕的用户体验:Embedding可能很慢。如果模型较大或GPU资源紧张,一个请求可能耗时数秒。在这期间,前端UI会一直处于等待状态,用户会感觉应用“卡顿”。
  2. 紧耦合与脆弱性:前端、后端服务、Embedding模型服务、向量数据库之间是紧耦合的同步调用链。链条上任何一个环节出现网络抖动或服务过载,整个请求就会失败,用户提交的数据可能会丢失,除非实现复杂的重试逻辑。
  3. 缺乏弹性与可伸缩性:如果短时间内有大量用户提交内容,Embedding服务会成为整个系统的瓶颈。我们无法独立地扩展数据接收能力和数据处理能力。数据接收的峰值会直接冲击到最慢的处理环节。

在真实项目中,这种设计很快就会遇到天花板。它适用于低流量、对实时性要求不高的内部系统,但对于一个面向用户的、追求极致体验的应用来说,这套方案几乎是不可接受的。

方案B:基于Pulsar的事件驱动异步管道

为了解决同步架构的根本性问题,我们需要引入异步和解耦。这正是消息队列大放异彩的场景。我们选择Apache Pulsar,因为它云原生的特性、多租户、分层存储以及灵活的消费模型,实在是太适合这种场景了。

新的架构如下:

graph TD
    subgraph "用户交互层"
        A[Vue.js Web App] --> C{API 网关};
        B[React Native App] --> C;
    end

    subgraph "数据接收与缓冲"
        C -- "1. 提交文本, 立即返回" --> D[轻量级Ingestion Service];
        D -- "2. 发布'待处理'事件" --> E[Pulsar Topic: waiting_for_embedding];
    end

    subgraph "异步处理管道"
        E -- "3. 订阅事件" --> F[Embedding Consumer Service];
        F -- "4. 调用模型生成向量" --> G[Embedding Model Service];
        F -- "5. 向量与元数据写入" --> H[ChromaDB];
    end

    subgraph "数据查询层"
        A --> I[Search Service];
        B --> I;
        I -- "6. 语义查询" --> H;
    end

    style F fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#ccf,stroke:#333,stroke-width:2px

这套架构彻底改变了工作方式。

  1. 极致的用户体验:前端调用Ingestion Service,这个服务唯一的职责就是校验数据并将它作为一个消息扔进Pulsar,然后立即返回成功。整个过程可能只需要20毫秒,用户感觉不到任何延迟。
  2. 松耦合与高韧性:Ingestion Service和Embedding Consumer Service是完全解耦的。即使Embedding模型服务或ChromaDB暂时宕机,消息也会被持久化在Pulsar中。一旦下游服务恢复,Pulsar会自动重新投递,确保数据不会丢失。这为我们赢得了宝贵的故障恢复时间。
  3. 独立的扩展性:如果数据提交量激增,我们只需要扩展Ingestion Service和Pulsar的Broker。如果Embedding处理速度跟不上,我们可以简单地增加Embedding Consumer Service的实例数量,利用Pulsar的共享订阅(Shared Subscription)模式来并行处理消息,实现水平扩展。

最酷的是,Pulsar的这种架构让我们能够构建一个真正的“数据管道”。未来如果想增加新的处理步骤,比如对文本进行内容审核、提取关键词、或者备份到对象存储,我们只需要增加一个新的消费者订阅同一个Topic即可,完全不需要改动现有逻辑。

最终选择与理由

毫无疑问,方案B是构建一个健壮、可扩展、用户体验优秀系统的唯一选择。初期增加的复杂度(引入Pulsar)换来的是长期的系统稳定性和业务灵活性。对于一个探索性、追求技术卓越的团队来说,这不仅仅是一个技术选择,更是一种架构理念的体现:面向失败设计,拥抱异步,构建可演化的系统。

核心实现概览

我们将使用Python构建后端服务,因为它拥有丰富的AI/ML生态。前端部分仅作示意。

1. Pulsar 主题与生产者(Ingestion Service)

这是一个基于FastAPI的简单服务,它接收来自前端的HTTP请求,并将数据发布到Pulsar。

ingestion_service.py

import pulsar
import json
import logging
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
import os

# --- 配置 ---
PULSAR_SERVICE_URL = os.getenv("PULSAR_SERVICE_URL", "pulsar://localhost:6650")
PULSAR_TOPIC = os.getenv("PULSAR_TOPIC", "persistent://public/default/document-ingestion")

# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Pulsar客户端初始化 ---
# 在生产环境中,客户端应该是单例的,并处理好连接生命周期
try:
    client = pulsar.Client(PULSAR_SERVICE_URL)
    producer = client.create_producer(PULSAR_TOPIC)
    logging.info(f"成功连接到 Pulsar 并创建了到主题 {PULSAR_TOPIC} 的生产者")
except Exception as e:
    logging.error(f"无法连接到 Pulsar: {e}", exc_info=True)
    # 在应用启动时就失败,避免服务以不健康的状态运行
    raise

app = FastAPI()

class Document(BaseModel):
    user_id: str
    document_id: str
    text: str
    source: str # 'vue_app' or 'react_native_app'

@app.post("/ingest", status_code=status.HTTP_202_ACCEPTED)
async def ingest_document(doc: Document):
    """
    接收文档,将其序列化后异步发送到 Pulsar。
    这个端点会立即返回,不会等待后续处理。
    """
    try:
        # 将Pydantic模型转换为字典,然后转为JSON字符串
        message_payload = doc.model_dump_json().encode('utf-8')

        # 异步发送消息
        # 使用 document_id 作为 key,可以保证相同文档的更新按顺序处理(如果Pulsar主题分区设置了Key_Shared模式)
        producer.send_async(message_payload, partition_key=doc.document_id, callback=ack_callback)

        logging.info(f"成功将文档 {doc.document_id} 发送到 Pulsar 主题")
        return {"message": "Document accepted for processing.", "document_id": doc.document_id}
    except Exception as e:
        logging.error(f"发送消息到 Pulsar 失败: {e}", exc_info=True)
        # 即使是异步,发送操作本身也可能因本地缓冲区满等原因失败
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="Failed to queue document for processing. Please try again later."
        )

def ack_callback(res, msg_id):
    """
    消息发送到Pulsar Broker后的回调函数。
    主要用于日志记录或监控。
    """
    if res == pulsar.Result.Ok:
        logging.info(f"消息 {msg_id} 已被 Pulsar Broker 确认")
    else:
        logging.error(f"消息 {msg_id} 发送到 Pulsar Broker 失败: {res}")

@app.on_event("shutdown")
def shutdown_event():
    """
    应用关闭时,优雅地关闭Pulsar客户端和生产者。
    """
    logging.info("正在关闭 Pulsar 生产者和客户端...")
    producer.close()
    client.close()
    logging.info("Pulsar 资源已关闭")

这里的核心是producer.send_async。它将消息放入一个内部队列后立即返回,由后台线程负责发送。这确保了API端点的极低延迟。ack_callback提供了一种机制来确认消息是否已成功被Broker接收。

2. 向量生成消费者 (Embedding Consumer Service)

这个服务是整个管道的核心。它持续地从Pulsar拉取消息,调用模型,然后将结果写入ChromaDB。

embedding_consumer.py

import pulsar
import json
import logging
import chromadb
from sentence_transformers import SentenceTransformer
import time
import os

# --- 配置 ---
PULSAR_SERVICE_URL = os.getenv("PULSAR_SERVICE_URL", "pulsar://localhost:6650")
PULSAR_TOPIC = os.getenv("PULSAR_TOPIC", "persistent://public/default/document-ingestion")
SUBSCRIPTION_NAME = "embedding-worker-subscription"
CHROMA_HOST = os.getenv("CHROMA_HOST", "localhost")
CHROMA_PORT = int(os.getenv("CHROMA_PORT", "8000"))
COLLECTION_NAME = "app_knowledge_base"
MODEL_NAME = 'all-MiniLM-L6-v2' # 一个轻量但效果不错的模型

# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class EmbeddingWorker:
    def __init__(self):
        logging.info("正在初始化 Embedding Worker...")
        # 1. 初始化模型 (这可能是个耗时操作)
        self.model = SentenceTransformer(MODEL_NAME)
        logging.info(f"SentenceTransformer 模型 '{MODEL_NAME}' 加载成功")

        # 2. 初始化 ChromaDB 客户端
        self.chroma_client = chromadb.HttpClient(host=CHROMA_HOST, port=CHROMA_PORT)
        self.collection = self.chroma_client.get_or_create_collection(name=COLLECTION_NAME)
        logging.info(f"成功连接到 ChromaDB 并获取/创建集合 '{COLLECTION_NAME}'")

        # 3. 初始化 Pulsar 客户端和消费者
        self.pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
        self.consumer = self.pulsar_client.subscribe(
            PULSAR_TOPIC,
            subscription_name=SUBSCRIPTION_NAME,
            subscription_type=pulsar.SubscriptionType.Shared, # 允许多个实例并行消费
            negative_ack_redelivery_delay_ms=60000 # 如果nack,1分钟后重试
        )
        logging.info(f"成功订阅 Pulsar 主题 '{PULSAR_TOPIC}',订阅名 '{SUBSCRIPTION_NAME}'")

    def run(self):
        logging.info("Embedding Worker 开始监听消息...")
        while True:
            try:
                msg = self.consumer.receive() # 阻塞直到收到消息
                data = None
                try:
                    data = json.loads(msg.data().decode('utf-8'))
                    logging.info(f"收到消息: document_id={data['document_id']}")

                    # --- 核心处理逻辑 ---
                    # 1. 生成向量
                    text_to_embed = data['text']
                    vector = self.model.encode(text_to_embed).tolist()

                    # 2. 准备元数据
                    metadata = {
                        "user_id": data['user_id'],
                        "source": data['source'],
                        "ingested_at": int(time.time())
                    }

                    # 3. 写入 ChromaDB
                    # 使用 upsert,如果 document_id 已存在则更新,否则插入
                    self.collection.upsert(
                        ids=[data['document_id']],
                        embeddings=[vector],
                        metadatas=[metadata]
                    )
                    
                    logging.info(f"成功处理并存储了 document_id={data['document_id']} 的向量")
                    
                    # 4. 确认消息,Pulsar将不再投递此消息
                    self.consumer.acknowledge(msg)

                except Exception as e:
                    logging.error(f"处理消息失败 (document_id={data.get('document_id', 'N/A')}): {e}", exc_info=True)
                    # 发生未知错误,不确认消息,让Pulsar在延迟后重新投递
                    # 这是一个关键的容错机制!
                    self.consumer.negative_acknowledge(msg)
            
            except KeyboardInterrupt:
                logging.info("检测到中断信号,正在关闭...")
                break
            except Exception as e:
                logging.error(f"Pulsar 消费者发生严重错误: {e}", exc_info=True)
                # 等待一段时间后重试,避免在Pulsar连接问题时快速耗尽CPU
                time.sleep(5)
    
    def close(self):
        self.pulsar_client.close()
        logging.info("Pulsar 客户端已关闭")


if __name__ == "__main__":
    worker = EmbeddingWorker()
    try:
        worker.run()
    finally:
        worker.close()

这段代码的健壮性体现在:

  • 共享订阅(Shared):允许多个embedding_consumer.py进程实例同时运行,Pulsar会以轮询方式将消息分发给它们,轻松实现水平扩展。
  • 消息确认机制:只有当所有步骤(生成向量、写入ChromaDB)都成功后,才调用self.consumer.acknowledge(msg)
  • 否定确认(NACK):如果在处理过程中发生任何异常,调用self.consumer.negative_acknowledge(msg)。Pulsar会根据配置的延迟时间(negative_ack_redelivery_delay_ms)后重新投递该消息给一个可用的消费者。这避免了因暂时性问题(如数据库网络抖动)导致的数据丢失。在生产环境中,还可以配置死信队列(Dead Letter Queue)来处理反复失败的消息。

3. 前端集成与查询服务

查询路径相对简单,它仍然是一个同步的HTTP API。前端(Vue或React Native)调用这个API来执行语义搜索。

search_service.py (一个独立的FastAPI服务)

import logging
import chromadb
from sentence_transformers import SentenceTransformer
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os

# --- 配置 ---
CHROMA_HOST = os.getenv("CHROMA_HOST", "localhost")
CHROMA_PORT = int(os.getenv("CHROMA_PORT", "8000"))
COLLECTION_NAME = "app_knowledge_base"
MODEL_NAME = 'all-MiniLM-L6-v2'

# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 初始化 ---
app = FastAPI()
model = SentenceTransformer(MODEL_NAME)
chroma_client = chromadb.HttpClient(host=CHROMA_HOST, port=CHROMA_PORT)
collection = chroma_client.get_collection(name=COLLECTION_NAME)

class SearchQuery(BaseModel):
    query_text: str
    top_k: int = 5

@app.post("/search")
async def search(query: SearchQuery):
    try:
        logging.info(f"收到查询: '{query.query_text}', top_k={query.top_k}")
        
        # 1. 将查询文本也转化为向量
        query_vector = model.encode(query.query_text).tolist()

        # 2. 在 ChromaDB 中执行查询
        results = collection.query(
            query_embeddings=[query_vector],
            n_results=query.top_k,
            # include=['metadatas', 'documents', 'distances'] # 可以获取更多信息
        )
        
        logging.info(f"查询到 {len(results.get('ids', [[]])[0])} 个结果")
        return results
    except Exception as e:
        logging.error(f"查询失败: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error during search.")

前端Vue.js或React Native应用中,发起搜索请求的代码会非常直接:

api.js (示意代码)

//
// 这是一个在Vue或React Native项目中使用的通用API服务文件
//
import axios from 'axios';

const apiClient = axios.create({
    baseURL: 'http://your-api-gateway.com', // 指向你的API网关
    timeout: 10000,
});

// 用于提交文档,这是一个“Fire and Forget”的操作
export const submitDocument = async ({ userId, documentId, text, source }) => {
    try {
        const response = await apiClient.post('/ingest', {
            user_id: userId,
            document_id: documentId,
            text,
            source,
        });
        // 立即返回,UI可以显示“提交成功,正在处理中...”
        return response.data; 
    } catch (error) {
        // 处理网络错误或Ingestion Service不可用的情况
        console.error("Error submitting document:", error);
        throw error;
    }
};

// 用于执行语义搜索
export const semanticSearch = async (queryText, topK = 5) => {
    try {
        const response = await apiClient.post('/search', {
            query_text: queryText,
            top_k: topK,
        });
        // 返回搜索结果以供UI渲染
        return response.data;
    } catch (error) {
        console.error("Error performing search:", error);
        throw error;
    }
};

架构的扩展性与局限性

这套架构的扩展性极佳。我们可以轻易地增加新的消费者来丰富功能,例如:

  • KeywordExtractionConsumer: 订阅同一个主题,使用NLP库提取关键词并存入关系型数据库。
  • ArchivingConsumer: 将原始文本备份到S3等对象存储。
  • AnalyticsConsumer: 统计文档来源、长度等信息并推送到分析系统。
    所有这些都可以并行进行,互不干扰。

当然,当前方案也存在一些局限性。首先,我们引入了Pulsar和ChromaDB两个新的中间件,这增加了系统的运维复杂度和成本。对于运维经验不足的团队,这可能是一个挑战。其次,这是一个最终一致性的系统,从用户提交到内容可被搜索到之间存在一个(通常是秒级)延迟。这个延迟对于大多数知识库场景是完全可以接受的,但对于要求强一致性的金融交易等场景则不适用。最后,为了简化,我们的Ingestion Service和Search Service非常薄,在生产环境中,它们前面需要一个功能完备的API网关来处理认证、限流、日志等横切关注点。未来的优化路径可能包括为Pulsar消息设置Schema(如Avro),以实现更强的数据治理和跨语言兼容性。


  目录