构建基于 Ray 的分布式 Solr 索引管道以驱动 ISR 页面近实时更新


我们面临一个棘手的伸缩性问题。生产环境的 SolrCloud 集群承载着数十亿级别的文档,并且每日新增数据量巨大。传统的增量索引方式,即直接将新文档推送到 Solr 集群进行处理和提交,已经触及了瓶颈。在高峰期,大量的索引写入请求严重影响了查询性能,CPU 负载居高不下,更糟糕的是,commit 操作带来的缓存失效和预热开销导致了用户可感的查询延迟抖动。前端团队采用的 ISR (Incremental Static Regeneration) 策略本意是提升用户体验,但在这种后端不稳定的情况下,页面重建时常抓取到陈旧或正在变更的数据,有时甚至超时,体验适得其反。

核心矛盾在于:索引写入的计算密集型负载与查询服务的低延迟要求在同一个集群内发生了冲突。

初步的构想是彻底将两者分离。我们不再向 live 集群直接写入,而是设计一个离线的、可水平扩展的索引构建管道。这个管道负责处理原始数据,生成完整的、可供直接使用的 Solr/Lucene 索引分片(segments)。当新一批索引分片构建完成后,我们通过某种机制,原子性地、近乎无缝地将它们“热切换”到在线服务集群中。这种架构,将繁重的索引构建任务转移到独立的计算资源池,保证了线上查询集群的绝对稳定。

技术选型上,Ray 脱颖而出。它轻量级的 Actor 和 Task 模型非常适合这种大规模并行的无状态数据处理任务。我们可以轻易地将文档处理和索引构建的逻辑封装成 Ray Actor,并根据数据量动态扩展成百上千个并行的 worker。而管理索引版本的“原子性切换”则触及了分布式一致性的核心。我们需要一个可靠的协调服务来记录和广播当前“权威”的索引版本,确保所有 Solr 节点和前端 ISR 的 revalidate 请求都能看到一致的视图。虽然 SolrCloud 自带的 ZooKeeper 就能扮演这个角色,但为了构建一个更通用的控制平面,我们决定独立实现一个轻量级的状态协调器,其背后的原理与 Raft 等一致性协议类似。最后,这个复杂的控制平面逻辑必须有坚实的测试保障,Vitest 的现代化和高性能特性使其成为测试我们 TypeScript 控制平面代码的首选。

整个系统的架构图如下:

flowchart TD
    subgraph "数据源 (Data Sources)"
        A[Kafka / S3 / DB]
    end

    subgraph "分布式索引构建管道 (Ray Cluster)"
        B(Ray Head Node)
        C1(Ray Worker 1 - IndexBuilderActor)
        C2(Ray Worker 2 - IndexBuilderActor)
        C3(Ray Worker N - IndexBuilderActor)
    end

    subgraph "索引存储 (Index Storage)"
        D[S3 / HDFS]
    end

    subgraph "控制平面 (Control Plane)"
        E{Index State Manager}
        F[PostgreSQL / etcd]
    end
    
    subgraph "查询服务层 (SolrCloud)"
        G1[Solr Node 1]
        G2[Solr Node 2]
        G3[Solr Node N]
    end

    subgraph "应用层 (Next.js App)"
        H{ISR Page}
    end

    A --> B
    B -- 分发任务 --> C1
    B -- 分发任务 --> C2
    B -- 分发任务 --> C3
    C1 -- 生成索引分片 --> D
    C2 -- 生成索引分片 --> D
    C3 -- 生成索引分片 --> D
    C1 & C2 & C3 -- 构建完成, 更新状态 --> E
    E -- 读写状态 --> F
    E -- 触发切换指令 --> G1
    E -- 触发切换指令 --> G2
    E -- 触发切换指令 --> G3
    G1 -- 拉取索引分片 --> D
    G2 -- 拉取索引分片 --> D
    G3 -- 拉取索引分片 --> D
    H -- revalidate 时查询 --> G1
    H -- revalidate 时查询元数据 --> E

步骤一:使用 Ray Actor 构建分布式索引生成器

核心任务是创建一个 IndexBuilderActor,它能够接收一批原始数据,并利用本地的 Lucene 库生成一个独立的 Solr 索引目录。这里的关键在于,每个 Actor 都在本地文件系统上工作,完成后将整个目录结构上传到共享的分布式文件系统(如 S3)。这避免了在构建过程中任何形式的网络 I/O 瓶颈。

# file: index_builder.py
import ray
import os
import shutil
import uuid
import logging
import pysolr
from lucene import initVM, IndexWriter, StandardAnalyzer, Document, Field, StringField, TextField, IndexWriterConfig

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 初始化Java VM for PyLucene
# 在Ray worker上,需要确保每个worker进程都初始化一次
if not os.environ.get("JCC_SKIP_INIT"):
    initVM(vmargs=['-Djava.awt.headless=true'])

@ray.remote
class IndexBuilderActor:
    def __init__(self, s3_client, bucket_name):
        """
        初始化Actor
        :param s3_client: 预配置的boto3 S3客户端实例
        :param bucket_name: 存储索引分片的S3桶名
        """
        self.s3_client = s3_client
        self.bucket_name = bucket_name
        self.local_build_path = f"/tmp/solr_index_build_{uuid.uuid4()}"
        os.makedirs(self.local_build_path, exist_ok=True)
        logging.info(f"Actor {ray.get_runtime_context().get_actor_id()} initialized. Build path: {self.local_build_path}")

    def _create_index_writer(self, path):
        """
        创建一个Lucene IndexWriter
        """
        config = IndexWriterConfig(StandardAnalyzer())
        config.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
        return IndexWriter(path, config)

    def build_segment(self, data_batch: list[dict], index_version: str) -> str:
        """
        接收一批数据,构建一个索引分片,并上传到S3
        :param data_batch: 文档数据列表,每个dict是一个文档
        :param index_version: 当前构建的全局索引版本号
        :return: 上传到S3的索引分片路径
        """
        segment_id = str(uuid.uuid4())
        local_segment_path = os.path.join(self.local_build_path, segment_id)
        
        try:
            writer = self._create_index_writer(local_segment_path)
            logging.info(f"Starting to build segment {segment_id} for version {index_version} with {len(data_batch)} docs.")
            
            for doc_data in data_batch:
                if 'id' not in doc_data:
                    # 在真实项目中,错误处理应该更完善,例如发送到死信队列
                    logging.warning(f"Skipping document due to missing 'id': {doc_data}")
                    continue
                
                doc = Document()
                doc.add(StringField("id", str(doc_data["id"]), Field.Store.YES))
                
                # 假设我们有 title 和 content 字段
                if 'title' in doc_data:
                    doc.add(TextField("title", doc_data["title"], Field.Store.YES))
                if 'content' in doc_data:
                    doc.add(TextField("content", doc_data["content"], Field.Store.NO))
                
                writer.addDocument(doc)
            
            writer.commit()
            writer.close()
            logging.info(f"Segment {segment_id} build complete locally.")

            # 将构建好的索引目录上传到S3
            s3_prefix = f"indices/{index_version}/{segment_id}"
            self._upload_directory_to_s3(local_segment_path, s3_prefix)
            logging.info(f"Segment {segment_id} uploaded to s3://{self.bucket_name}/{s3_prefix}")
            
            return f"s3://{self.bucket_name}/{s3_prefix}"

        except Exception as e:
            logging.error(f"Error building segment {segment_id}: {e}", exc_info=True)
            # 异常处理:确保不返回错误的路径
            return None
        finally:
            # 清理本地临时文件
            if os.path.exists(local_segment_path):
                shutil.rmtree(local_segment_path)

    def _upload_directory_to_s3(self, local_path, s3_prefix):
        """
        辅助函数:上传整个目录到S3
        """
        for root, dirs, files in os.walk(local_path):
            for filename in files:
                local_file_path = os.path.join(root, filename)
                # s3_key保持相对路径结构
                relative_path = os.path.relpath(local_file_path, local_path)
                s3_key = os.path.join(s3_prefix, relative_path)
                self.s3_client.upload_file(local_file_path, self.bucket_name, s3_key)

    def destroy(self):
        """
        清理Actor创建的资源
        """
        shutil.rmtree(self.local_build_path)
        logging.info(f"Actor resources cleaned up.")

# --- Driver Code Example ---
# 在主进程中运行
# import boto3
# 
# def run_indexing_job(data_source, num_actors=10):
#     ray.init(address='auto') # 连接到已有的Ray集群
#     s3 = boto3.client('s3')
#     bucket = 'my-solr-indices'
#
#     actors = [IndexBuilderActor.remote(s3, bucket) for _ in range(num_actors)]
#     
#     new_index_version = f"v{uuid.uuid4()}"
#     # 注册到控制平面,状态为 'BUILDING'
#     # control_plane_api.create_version(new_index_version)
#     
#     results = []
#     for i, batch in enumerate(data_source.get_batches()):
#         actor = actors[i % num_actors]
#         results.append(actor.build_segment.remote(batch, new_index_version))
#
#     segment_paths = ray.get(results)
#     successful_paths = [p for p in segment_paths if p is not None]
#     
#     # 如果所有分片都成功,通知控制平面状态为 'READY'
#     # if len(successful_paths) == len(results):
#     #    control_plane_api.update_version_status(new_index_version, 'READY', successful_paths)
#     # else:
#     #    control_plane_api.update_version_status(new_index_version, 'FAILED')
#
#     ray.shutdown()

这个 Actor 的设计体现了几个关键点:

  1. 资源隔离: 每个 Actor 在自己独立的临时目录中工作,互不干扰。
  2. 无状态: Actor 本身不维护跨任务的状态,使其易于调度和替换。
  3. 错误处理: build_segment 方法有明确的 try...except...finally 结构,确保即使失败也能清理本地资源,并返回 None 信号。
  4. 与外部系统解耦: S3 客户端作为依赖注入,方便测试和替换。

步骤二:实现控制平面与分布式一致性

控制平面的核心是 Index State Manager。它需要提供 API 来管理索引版本的生命周期:BUILDING, READY, LIVE, DEPRECATED。当一个版本的所有分片在 Ray 集群上构建完成并上传至 S3 后,驱动程序会调用 API 将该版本的状态更新为 READY。此时,运维人员或一个自动化进程可以调用另一个 API,触发“上线”操作,将 READY 版本切换为 LIVE。这个切换动作必须是原子的。

在真实生产环境中,我们会用 etcd 或 ZooKeeper 来存储这个状态,利用其 CAS (Compare-And-Swap) 操作保证原子性。这里为了演示,我们用一个基于 Fastify 和 PostgreSQL 的简单服务来模拟,并重点讨论其 API 设计和状态转换逻辑。

// file: control-plane/src/server.ts
import fastify from 'fastify';
import { Pool } from 'pg';

// 假设我们有一个DB来持久化状态
const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
});

// 定义索引版本状态
type IndexVersionStatus = 'BUILDING' | 'READY' | 'LIVE' | 'FAILED' | 'DEPRECATED';

interface IndexVersion {
  id: number;
  version_name: string;
  status: IndexVersionStatus;
  segment_paths: string[];
  created_at: Date;
  updated_at: Date;
}

const server = fastify({ logger: true });

// 1. 创建一个新的索引版本
server.post('/versions', async (request, reply) => {
  const { versionName } = request.body as { versionName: string };
  if (!versionName) {
    return reply.status(400).send({ error: 'versionName is required' });
  }

  try {
    const { rows } = await pool.query<IndexVersion>(
      'INSERT INTO index_versions (version_name, status) VALUES ($1, $2) RETURNING *',
      [versionName, 'BUILDING']
    );
    return reply.status(201).send(rows[0]);
  } catch (err) {
    server.log.error(err);
    // 可能是版本名冲突
    if (err.code === '23505') {
      return reply.status(409).send({ error: 'Version name already exists' });
    }
    return reply.status(500).send({ error: 'Internal Server Error' });
  }
});

// 2. 更新版本状态(例如,从 BUILDING -> READY)
server.patch('/versions/:versionName', async (request, reply) => {
  const { versionName } = request.params as { versionName: string };
  const { status, segmentPaths } = request.body as { status: IndexVersionStatus; segmentPaths?: string[] };

  // 这里的状态转换逻辑是核心,必须严格控制
  const allowedTransitions: Record<IndexVersionStatus, IndexVersionStatus[]> = {
    BUILDING: ['READY', 'FAILED'],
    READY: ['LIVE', 'FAILED'],
    LIVE: ['DEPRECATED'],
    FAILED: [],
    DEPRECATED: [],
  };

  try {
    // 使用事务保证原子性
    const client = await pool.connect();
    try {
      await client.query('BEGIN');
      const { rows } = await client.query<IndexVersion>('SELECT * FROM index_versions WHERE version_name = $1 FOR UPDATE', [versionName]);
      if (rows.length === 0) {
        return reply.status(404).send({ error: 'Version not found' });
      }
      const currentVersion = rows[0];

      if (!allowedTransitions[currentVersion.status]?.includes(status)) {
        return reply.status(400).send({
          error: `Invalid status transition from ${currentVersion.status} to ${status}`,
        });
      }

      let queryText = 'UPDATE index_versions SET status = $1, updated_at = NOW()';
      const queryParams: any[] = [status];
      
      if (segmentPaths) {
        queryText += ', segment_paths = $2 WHERE version_name = $3 RETURNING *';
        queryParams.push(JSON.stringify(segmentPaths), versionName);
      } else {
        queryText += ' WHERE version_name = $2 RETURNING *';
        queryParams.push(versionName);
      }

      const { rows: updatedRows } = await client.query<IndexVersion>(queryText, queryParams);
      
      await client.query('COMMIT');
      return reply.send(updatedRows[0]);
    } catch (e) {
      await client.query('ROLLBACK');
      throw e;
    } finally {
      client.release();
    }
  } catch (err) {
    server.log.error(err);
    return reply.status(500).send({ error: 'Internal Server Error' });
  }
});

// 3. 原子性地切换 LIVE 版本
server.post('/actions/promote-live', async (request, reply) => {
    const { versionName } = request.body as { versionName: string };
    if (!versionName) {
        return reply.status(400).send({ error: 'versionName is required to promote' });
    }

    const client = await pool.connect();
    try {
        await client.query('BEGIN');
        
        // 找到当前 LIVE 的版本
        const { rows: currentLiveRows } = await client.query<IndexVersion>(
            "SELECT * FROM index_versions WHERE status = 'LIVE' FOR UPDATE"
        );

        // 找到准备提升为 LIVE 的版本
        const { rows: newLiveRows } = await client.query<IndexVersion>(
            "SELECT * FROM index_versions WHERE version_name = $1 FOR UPDATE",
            [versionName]
        );

        if (newLiveRows.length === 0) {
            return reply.status(404).send({ error: `Version ${versionName} not found` });
        }
        if (newLiveRows[0].status !== 'READY') {
            return reply.status(400).send({ error: `Version ${versionName} is not in READY state` });
        }

        // 将旧的 LIVE 版本设置为 DEPRECATED
        if (currentLiveRows.length > 0) {
            await client.query(
                "UPDATE index_versions SET status = 'DEPRECATED', updated_at = NOW() WHERE id = $1",
                [currentLiveRows[0].id]
            );
        }

        // 提升新版本为 LIVE
        const { rows: promotedRows } = await client.query<IndexVersion>(
            "UPDATE index_versions SET status = 'LIVE', updated_at = NOW() WHERE version_name = $1 RETURNING *",
            [versionName]
        );

        await client.query('COMMIT');

        // 在这里触发对 SolrCloud 的 API 调用,使用 Solr Collections API 的 ALIAS 功能原子切换
        // e.g., triggerSolrAliasSwitch(old_live_collection, new_live_collection);
        
        server.log.info(`Promoted ${versionName} to LIVE.`);
        return reply.send(promotedRows[0]);

    } catch (e) {
        await client.query('ROLLBACK');
        server.log.error(e);
        return reply.status(500).send({ error: 'Internal Server Error during promotion' });
    } finally {
        client.release();
    }
});


// 4. 获取当前 LIVE 的版本信息
server.get('/versions/live', async (request, reply) => {
    const { rows } = await pool.query<IndexVersion>("SELECT * FROM index_versions WHERE status = 'LIVE' LIMIT 1");
    if (rows.length === 0) {
        return reply.status(404).send({ error: 'No live version found' });
    }
    return reply.send(rows[0]);
});

server.listen({ port: 3000 }, (err, address) => {
    if (err) {
        server.log.error(err);
        process.exit(1);
    }
});

这个控制平面的实现,虽然简单,但通过数据库事务和 SELECT ... FOR UPDATE 锁,模拟了分布式一致性中的关键操作——原子性状态转换。在 Solr 端,最优雅的实现是为每个索引版本创建一个独立的 collection,然后使用 Solr 的 Collection Alias 功能进行原子切换。当 promote-live API 被调用成功后,控制平面会执行一个命令,将一个固定的别名(例如 products_live)从指向旧的 collection 切换到指向新的 collection。所有客户端都只查询这个别名,从而实现无缝切换。

步骤三:使用 Vitest 测试控制平面逻辑

任何一个协调服务,其可靠性都至关重要。它的状态转换逻辑必须经过严格测试。Vitest 的 mockin-source testing 特性非常适合这种场景。

// file: control-plane/src/server.test.ts
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { Pool } from 'pg';

// 我们将模拟整个 'pg' 库
vi.mock('pg', () => {
    const mockClient = {
        query: vi.fn(),
        release: vi.fn(),
    };
    const mockPool = {
        connect: vi.fn(() => Promise.resolve(mockClient)),
        query: vi.fn(),
    };
    return { Pool: vi.fn(() => mockPool) };
});

const mockPool = new Pool();
const mockClient = await mockPool.connect();

// 需要在测试中导入 server 实例
// 这需要对 server.ts 进行一些重构,导出 server 实例而不是直接监听
import { buildServer } from './server'; // 假设重构后的 server.ts 导出了一个 buildServer 函数
const server = buildServer();

describe('Index Version State Manager API', () => {
    beforeEach(() => {
        // 在每个测试前重置所有 mock
        vi.resetAllMocks();
        // 模拟事务控制
        mockClient.query.mockImplementation((queryText) => {
            if (queryText === 'BEGIN' || queryText === 'COMMIT' || queryText === 'ROLLBACK') {
                return Promise.resolve();
            }
            return Promise.resolve({ rows: [] }); // 默认返回空
        });
    });

    it('should promote a READY version to LIVE and deprecate the old one', async () => {
        // Arrange
        const oldLiveVersion = { id: 1, version_name: 'v1', status: 'LIVE' };
        const readyVersion = { id: 2, version_name: 'v2', status: 'READY' };

        // 模拟数据库返回
        mockClient.query
            .mockResolvedValueOnce({ rows: [oldLiveVersion] }) // SELECT old live
            .mockResolvedValueOnce({ rows: [readyVersion] })   // SELECT new live
            .mockResolvedValueOnce({ rows: [] })               // UPDATE old live to DEPRECATED
            .mockResolvedValueOnce({ rows: [{...readyVersion, status: 'LIVE'}] }); // UPDATE new live

        // Act
        const response = await server.inject({
            method: 'POST',
            url: '/actions/promote-live',
            payload: { versionName: 'v2' },
        });

        // Assert
        expect(response.statusCode).toBe(200);
        expect(JSON.parse(response.payload).status).toBe('LIVE');
        expect(JSON.parse(response.payload).version_name).toBe('v2');

        // 验证数据库调用
        expect(mockClient.query).toHaveBeenCalledWith('BEGIN');
        expect(mockClient.query).toHaveBeenCalledWith(expect.stringContaining("status = 'LIVE' FOR UPDATE"));
        expect(mockClient.query).toHaveBeenCalledWith(expect.stringContaining("version_name = $1 FOR UPDATE"), ['v2']);
        expect(mockClient.query).toHaveBeenCalledWith(expect.stringContaining("SET status = 'DEPRECATED'"), [1]);
        expect(mockClient.query).toHaveBeenCalledWith(expect.stringContaining("SET status = 'LIVE'"), ['v2']);
        expect(mockClient.query).toHaveBeenCalledWith('COMMIT');
    });

    it('should fail to promote a version that is not in READY state', async () => {
        // Arrange
        const buildingVersion = { id: 2, version_name: 'v2', status: 'BUILDING' };
        mockClient.query
            .mockResolvedValueOnce({ rows: [] }) // No current LIVE
            .mockResolvedValueOnce({ rows: [buildingVersion] });

        // Act
        const response = await server.inject({
            method: 'POST',
            url: '/actions/promote-live',
            payload: { versionName: 'v2' },
        });

        // Assert
        expect(response.statusCode).toBe(400);
        expect(JSON.parse(response.payload).error).toContain('not in READY state');
        expect(mockClient.query).not.toHaveBeenCalledWith(expect.stringContaining("SET status = 'DEPRECATED'"));
        expect(mockClient.query).toHaveBeenCalledWith('ROLLBACK');
    });
});

步骤四:集成 ISR 前端

前端的改造相对简单。在 Next.js 页面的 getStaticProps 中,当 revalidate 被触发时,我们不再是简单地查询 Solr。而是增加一个步骤:首先查询控制平面的 /versions/live 接口,获取当前权威的 collection 别名或版本标识。

// file: pages/search.js
import { searchClient } from '../lib/solr'; // 假设的 Solr 客户端

export async function getStaticProps(context) {
  // 1. 从我们的控制平面获取当前的 LIVE 版本信息
  let liveVersionInfo;
  try {
    const res = await fetch('http://control-plane:3000/versions/live');
    if (!res.ok) {
        // 容错处理:如果控制平面不可用,可以考虑使用一个已知的、最近的 LIVE 版本作为后备
        // 或者直接返回错误,强制页面重新构建失败,等待系统恢复
        throw new Error('Control Plane is not available.');
    }
    liveVersionInfo = await res.json();
  } catch (error) {
    console.error("Failed to fetch live version:", error);
    // 返回一个 props,让页面可以展示一个错误状态
    return { props: { error: 'Search service is temporarily unavailable.' }, revalidate: 10 };
  }

  // 2. 使用获取到的版本信息来查询 Solr
  // 这里的关键是,查询的目标是动态的,由控制平面决定
  // 方案A: 查询一个固定的别名,该别名由控制平面在切换时原子更新
  const collectionAlias = 'products_live'; 
  const searchResults = await searchClient.query(collectionAlias, {
    q: context.params?.query || '*:*',
    // ... 其他查询参数
  });

  // 方案B: 如果不用别名,而是直接查询版本化的 collection
  // const collectionName = `products_${liveVersionInfo.version_name}`;
  // const searchResults = await searchClient.query(collectionName, ...);

  return {
    props: {
      results: searchResults,
      version: liveVersionInfo.version_name, // 可以把版本号传给前端用于调试
    },
    // revalidate 时间可以设置得相对短,因为后端切换成本很低
    revalidate: 60,
  };
}

// ... React Component ...

这种模式下,ISR 的 revalidate 变得非常高效和安全。页面重建总能命中一个稳定、预热完成的 Solr collection。即使在索引版本切换的瞬间,由于别名切换是原子的,请求只会被路由到旧或新的 collection,绝不会出现数据不一致或查询失败的情况。

方案局限性与未来展望

这个架构虽然解决了索引与查询负载冲突的核心问题,但并非没有权衡。首先,它引入了数据延迟。从数据产生到最终能在前端被搜索到,需要经历整个 Ray 管道的处理时间。这决定了该方案不适用于对实时性要求极高的场景,它是一种“近实时”的解决方案。延迟的时间窗口取决于数据量大小和 Ray 集群的计算能力,可能在几分钟到几十分钟不等。

其次,系统的复杂性显著增加。我们引入了 Ray 集群、一个独立的控制平面和一套复杂的运维流程。这对团队的技能栈和监控体系都提出了更高的要求。控制平面自身的可用性也成了关键路径,虽然我们讨论了使用 etcd 等成熟方案,但其部署和维护同样需要成本。

未来的优化路径可以集中在几个方面:

  1. 混合模式索引: 对于小部分需要极低延迟的更新,可以保留一个小的、独立的实时索引核心,并通过查询时合并结果的方式来兼顾实时性与大规模数据的查询性能。
  2. 自动化与智能化: 将 promote-live 的触发过程自动化。例如,基于对新构建索引的自动化质量校验(文档数、关键字段分布等)结果,自动执行上线流程,进一步减少人工干预。
  3. 成本优化: Ray 集群的资源可以基于任务队列的长度进行动态伸缩,在没有索引任务时缩减到零,以节省计算成本。同样,可以制定策略定期清理 S3 中 DEPRECATED 的索引分片。

  目录