使用Jest为Delta Lake上的Saga分布式事务模式构建确定性单元测试


一个看似简单的业务需求摆在面前:在一个用户注册流程中,我们需要原子性地向两个不同的Delta Lake表中写入数据。一张是 users 表,记录用户ID和认证信息;另一张是 user_profiles 表,记录用户的详细资料。如果任意一张表写入失败,整个注册操作必须回滚,不能留下任何脏数据。

在单体数据库中,一个 BEGIN TRANSACTION...COMMIT 就能解决。但在数据湖仓架构中,即便Delta Lake为单个表提供了ACID事务保证,跨多个表的原子性操作依然是一个经典的分布式系统难题。

一个直接的想法可能是这样的:

// naive_approach.ts
// 这是一个错误示范,它无法保证原子性
import { DeltaTable, writeDelta } from "@delta-rs/delta-js";
import { table } from "console";
import * as path from "path";
import * as fs from "fs-extra";
import { RecordBatch, makeTable } from "apache-arrow";

const USERS_TABLE_PATH = path.join(__dirname, "../test_tables/users_naive");
const PROFILES_TABLE_PATH = path.join(__dirname, "../test_tables/profiles_naive");

interface User {
  id: string;
  email: string;
  created_at: number;
}

interface Profile {
  user_id: string;
  full_name: string;
  bio: string;
}

// 辅助函数,用于写入数据
async function writeToTable(tablePath: string, data: RecordBatch) {
  const table = makeTable({
    ...data.schema.fields.reduce((acc, field) => ({ ...acc, [field.name]: field.type }), {}),
    ...Object.fromEntries(data.schema.fields.map((field, i) => [field.name, data.getChildAt(i)])),
  });
  await writeDelta(tablePath, table, { mode: "append" });
}

async function registerUserNaive(userId: string, email: string, fullName: string, bio: string): Promise<void> {
  // 确保目录存在
  await fs.ensureDir(USERS_TABLE_PATH);
  await fs.ensureDir(PROFILES_TABLE_PATH);

  const now = Date.now();
  
  const userRecord = RecordBatch.from({
    id: [userId],
    email: [email],
    created_at: [now],
  });

  const profileRecord = RecordBatch.from({
    user_id: [userId],
    full_name: [fullName],
    bio: [bio],
  });

  try {
    console.log("Step 1: Writing to users table...");
    await writeToTable(USERS_TABLE_PATH, userRecord);
    console.log("Step 1: Success.");

    // 模拟在第二步操作时发生致命错误
    if (fullName.includes("FAIL")) {
      throw new Error("Simulated infrastructure failure before writing to profiles table.");
    }
    
    console.log("Step 2: Writing to profiles table...");
    await writeToTable(PROFILES_TABLE_PATH, profileRecord);
    console.log("Step 2: Success.");

  } catch (error) {
    console.error("Registration failed. Attempting to rollback...", error);
    // 这里的回滚逻辑非常脆弱且不可靠
    // 如果删除 users 表记录的操作本身也失败了怎么办?
    // 如果在删除前进程崩溃了呢?
    const table = await DeltaTable.load(USERS_TABLE_PATH);
    await table.delete(`id = '${userId}'`); // 尝试补偿
    console.log("Rollback attempt finished.");
    throw new Error(`Failed to register user: ${error.message}`);
  }
}

上述代码的脆弱性显而易见。try...catch 块中的补偿操作(删除已插入的用户记录)本身也可能失败。如果进程在执行补偿逻辑之前就崩溃了,users 表就会留下一条孤儿数据,系统状态进入不一致。这正是CAP理论中一致性(C)与可用性(A)权衡的体现。强一致性的两阶段提交(2PC)协议在这种场景下过于笨重且会严重影响可用性。因此,在真实项目中,我们通常选择最终一致性方案,Saga模式是其中的佼佼者。

Saga模式与可测试的编排器设计

Saga模式的核心思想是将一个大的分布式事务拆分成一系列本地事务。每个本地事务都有一个对应的补偿(Compensating)操作。当Saga中的任何一个步骤失败时,系统会按相反的顺序依次调用已成功步骤的补偿操作,从而使系统状态回滚到初始状态。

要实现一个健壮的Saga,我们需要一个编排器(Orchestrator)。这个编排器负责按顺序执行每个步骤,并在失败时调用补偿逻辑。为了解决文章开头的问题,我们将用TypeScript设计并实现一个通用的、更重要的是——可单元测试的Saga编排器。

// src/saga_orchestrator.ts

// 定义Saga的每个步骤
export interface SagaStep<T> {
  name: string;
  // 执行正向操作,需要是幂等的
  action: (payload: T) => Promise<void>;
  // 执行补偿操作,必须是幂等的
  compensate: (payload: T) => Promise<void>;
}

// 定义编排器的日志接口,便于测试和监控
export interface SagaLogger {
  info(message: string): void;
  error(message: string, error?: any): void;
}

export class SagaOrchestrator<T> {
  private steps: SagaStep<T>[] = [];
  private logger: SagaLogger;

  constructor(logger: SagaLogger = console) {
    this.logger = logger;
  }

  // 添加一个Saga步骤
  addStep(step: SagaStep<T>): this {
    this.steps.push(step);
    return this;
  }

  // 执行Saga
  public async execute(payload: T): Promise<void> {
    const completedSteps: SagaStep<T>[] = [];

    for (const step of this.steps) {
      try {
        this.logger.info(`[Saga] Executing step: ${step.name}`);
        await step.action(payload);
        completedSteps.push(step);
        this.logger.info(`[Saga] Step successful: ${step.name}`);
      } catch (error) {
        this.logger.error(`[Saga] Step failed: ${step.name}`, error);
        await this.rollback(completedSteps, payload);
        // 重新抛出错误,让调用方知道Saga执行失败
        throw new Error(`Saga execution failed at step: ${step.name}`);
      }
    }
  }

  // 执行回滚
  private async rollback(completedSteps: SagaStep<T>[], payload: T): Promise<void> {
    this.logger.info("[Saga] Starting rollback...");
    
    // 按相反顺序执行补偿操作
    for (const step of [...completedSteps].reverse()) {
      try {
        this.logger.info(`[Saga] Compensating for step: ${step.name}`);
        await step.compensate(payload);
        this.logger.info(`[Saga] Compensation successful for step: ${step.name}`);
      } catch (compensationError) {
        // 这里的坑在于:如果补偿操作失败,系统就进入了一个需要人工干预的危险状态。
        // 在生产环境中,这里需要发出严重告警,并记录详细的上下文信息。
        this.logger.error(
          `[Saga] CRITICAL: Compensation for step ${step.name} failed. Manual intervention required.`,
          compensationError
        );
        // 尽管补偿失败,我们依然需要继续尝试回滚其他步骤
      }
    }
    this.logger.info("[Saga] Rollback finished.");
  }
}

这个编排器的设计有几个关键点:

  1. 通用性: 它通过泛型 T 接受任意类型的载荷(payload),不与具体业务逻辑耦合。
  2. 原子性步骤: SagaStep 接口清晰地定义了 actioncompensate,强制开发者思考正向和逆向操作。
  3. 状态追踪: completedSteps 数组是Saga执行状态的内存快照,用于驱动回滚。
  4. 日志: 注入一个 SagaLogger 接口,这在调试和测试时至关重要。我们可以传入 console,也可以在测试中传入一个mock logger来断言日志行为。
  5. 健壮的回滚: rollback 方法即使在某个补偿操作失败时也会继续尝试其他的,并记录严重错误。这是务实的设计,因为停止回滚可能会让系统状态更糟。

将Saga应用于Delta Lake的用户注册场景

现在,我们用这个编排器来重构之前的用户注册逻辑。

// src/user_registration_saga.ts
import { DeltaTable, writeDelta } from "@delta-rs/delta-js";
import { RecordBatch } from "apache-arrow";
import { SagaOrchestrator, SagaStep } from "./saga_orchestrator";
import { makeTable } from "apache-arrow";

// 定义Saga的载荷
export interface UserRegistrationPayload {
  userId: string;
  email: string;
  fullName: string;
  bio: string;
  // 注入表路径,便于测试隔离
  usersTablePath: string;
  profilesTablePath: string;
  // 用于在测试中注入失败
  simulateFailureInStep?: string;
}

// 辅助函数,封装Delta Lake写入逻辑
async function appendToDeltaTable(tablePath: string, data: any[]) {
    if (data.length === 0) return;
    const arrowTable = makeTable(data);
    await writeDelta(tablePath, arrowTable, { mode: "append" });
}

// 第一个步骤:创建用户
const createUsersStep: SagaStep<UserRegistrationPayload> = {
  name: "CreateUserRecord",
  action: async (payload) => {
    const userRecord = [{
      id: payload.userId,
      email: payload.email,
      created_at: Date.now(),
    }];
    await appendToDeltaTable(payload.usersTablePath, userRecord);

    if (payload.simulateFailureInStep === "CreateUserRecord") {
      throw new Error("Simulated failure in CreateUserRecord action");
    }
  },
  // 补偿操作:删除用户
  compensate: async (payload) => {
    // 这里的删除操作需要是幂等的。即使因为重试执行多次,结果也应相同。
    const table = await DeltaTable.load(payload.usersTablePath);
    await table.delete(`id = '${payload.userId}'`);
  },
};

// 第二个步骤:创建用户资料
const createProfileStep: SagaStep<UserRegistrationPayload> = {
  name: "CreateProfileRecord",
  action: async (payload) => {
    if (payload.simulateFailureInStep === "CreateProfileRecord") {
      throw new Error("Simulated failure in CreateProfileRecord action");
    }
    const profileRecord = [{
      user_id: payload.userId,
      full_name: payload.fullName,
      bio: payload.bio,
    }];
    await appendToDeltaTable(payload.profilesTablePath, profileRecord);
  },
  // 补偿操作:删除用户资料
  compensate: async (payload) => {
    const table = await DeltaTable.load(payload.profilesTablePath);
    await table.delete(`user_id = '${payload.userId}'`);
  },
};

// 创建并配置Saga
export function createUserRegistrationSaga(logger?: any): SagaOrchestrator<UserRegistrationPayload> {
  const saga = new SagaOrchestrator<UserRegistrationPayload>(logger);
  saga.addStep(createUsersStep).addStep(createProfileStep);
  return saga;
}

通过这种方式,业务逻辑被清晰地拆分到各个SagaStep中,主流程代码只需配置和执行Saga。simulateFailureInStep 属性是为单元测试埋下的钩子,它允许我们在特定步骤精确地注入故障。

sequenceDiagram
    participant Client
    participant Orchestrator
    participant CreateUserAction
    participant CreateUserCompensate
    participant CreateProfileAction
    participant CreateProfileCompensate

    Client->>+Orchestrator: execute(payload)
    Orchestrator->>+CreateUserAction: action(payload)
    CreateUserAction-->>-Orchestrator: success
    
    Orchestrator->>+CreateProfileAction: action(payload)
    Note right of CreateProfileAction: 发生致命错误!
    CreateProfileAction-->>-Orchestrator: failure
    
    Orchestrator->>+CreateUserCompensate: compensate(payload)
    Note left of Orchestrator: 开始回滚
    CreateUserCompensate-->>-Orchestrator: success
    
    Orchestrator-->>-Client: Saga Failed Error

使用Jest进行确定性单元测试

没有经过严格测试的分布式逻辑是不可信的。Saga模式最大的挑战之一就是如何有效地测试其补偿路径。借助Jest的mocking和spy能力,我们可以不依赖真实的延时或网络抖动,创建出确定性的、可重复的失败场景测试。

这是我们的测试文件结构和准备工作:

/
├── src/
│   ├── saga_orchestrator.ts
│   └── user_registration_saga.ts
├── tests/
│   ├── saga_orchestrator.test.ts
│   └── user_registration.integration.test.ts
├── test_tables/  (测试时动态创建)
└── package.json

package.json 依赖:

{
  "devDependencies": {
    "@types/jest": "^29.5.12",
    "@types/node": "^20.11.24",
    "jest": "^29.7.0",
    "ts-jest": "^29.1.2",
    "typescript": "^5.3.3"
  },
  "dependencies": {
    "@delta-rs/delta-js": "^0.9.0",
    "apache-arrow": "^15.0.0",
    "fs-extra": "^11.2.0"
  }
}

现在,编写集成测试来验证我们的Saga实现。

// tests/user_registration.integration.test.ts
import * as path from "path";
import * as fs from "fs-extra";
import { DeltaTable, DeltaTableJson } from "@delta-rs/delta-js";
import { createUserRegistrationSaga, UserRegistrationPayload } from "../src/user_registration_saga";

const TEST_DIR = path.join(__dirname, "../test_tables");
const USERS_TABLE_PATH = path.join(TEST_DIR, "users");
const PROFILES_TABLE_PATH = path.join(TEST_DIR, "profiles");

// 辅助函数,用于检查Delta表中的记录数
async function getTableCount(tablePath: string, filter?: string): Promise<number> {
  if (!await fs.pathExists(path.join(tablePath, "_delta_log"))) {
    return 0;
  }
  const table = await DeltaTable.load(tablePath);
  const selection = filter ? table.selectBy({ filter }) : table;
  const data = await selection.toArray();
  return data.length;
}

describe("UserRegistrationSaga Integration Test", () => {
  
  // 在每个测试用例开始前,清理并重新创建测试表目录
  beforeEach(async () => {
    await fs.remove(TEST_DIR);
    await fs.ensureDir(USERS_TABLE_PATH);
    await fs.ensureDir(PROFILES_TABLE_PATH);
  });
  
  // 测试结束后清理
  afterAll(async () => {
    await fs.remove(TEST_DIR);
  });

  test("should complete successfully when all steps succeed", async () => {
    const saga = createUserRegistrationSaga();
    const payload: UserRegistrationPayload = {
      userId: "user-123",
      email: "[email protected]",
      fullName: "John Doe",
      bio: "A test user.",
      usersTablePath: USERS_TABLE_PATH,
      profilesTablePath: PROFILES_TABLE_PATH,
    };

    await saga.execute(payload);

    // 断言:两个表都应该有且只有一条记录
    const userCount = await getTableCount(USERS_TABLE_PATH, `id = 'user-123'`);
    const profileCount = await getTableCount(PROFILES_TABLE_PATH, `user_id = 'user-123'`);
    
    expect(userCount).toBe(1);
    expect(profileCount).toBe(1);
  });

  test("should rollback correctly if the second step (CreateProfile) fails", async () => {
    const saga = createUserRegistrationSaga();
    const payload: UserRegistrationPayload = {
      userId: "user-456",
      email: "[email protected]",
      fullName: "Jane Fail",
      bio: "This will fail.",
      usersTablePath: USERS_TABLE_PATH,
      profilesTablePath: PROFILES_TABLE_PATH,
      // 关键:注入失败
      simulateFailureInStep: "CreateProfileRecord",
    };

    // 断言:Saga执行应该抛出异常
    await expect(saga.execute(payload)).rejects.toThrow("Saga execution failed at step: CreateProfileRecord");

    // 断言:补偿逻辑应该已经执行,两个表中都不应该有该用户的记录
    const userCount = await getTableCount(USERS_TABLE_PATH, `id = 'user-456'`);
    const profileCount = await getTableCount(PROFILES_TABLE_PATH, `user_id = 'user-456'`);

    expect(userCount).toBe(0);
    expect(profileCount).toBe(0);
    
    console.log("Verified that rollback cleans up data from all tables.");
  });
  
  test("should not write any data if the first step (CreateUser) fails", async () => {
    const saga = createUserRegistrationSaga();
    const payload: UserRegistrationPayload = {
      userId: "user-789",
      email: "[email protected]",
      fullName: "Peter Panic",
      bio: "Fails at the very beginning.",
      usersTablePath: USERS_TABLE_PATH,
      profilesTablePath: PROFILES_TABLE_PATH,
      simulateFailureInStep: "CreateUserRecord",
    };

    await expect(saga.execute(payload)).rejects.toThrow("Saga execution failed at step: CreateUserRecord");
    
    // 断言:因为第一步就失败了,没有任何数据应该被写入
    const userCount = await getTableCount(USERS_TABLE_PATH);
    const profileCount = await getTableCount(PROFILES_TABLE_PATH);

    expect(userCount).toBe(0);
    expect(profileCount).toBe(0);
  });
});

这个测试套件的核心价值在于 simulateFailureInStep。它让我们能够精确控制哪个环节出错,然后验证系统的最终状态是否符合预期。

  • Happy Path 测试: 验证了在一切正常时,数据能被正确写入。
  • 第二步失败测试: 这是最关键的测试。它模拟了 CreateProfileRecord 步骤的失败。测试断言不仅Saga会抛出异常,更重要的是,之前成功执行的 CreateUserRecord 步骤被正确补偿了(即 users 表中的记录被删除了)。这验证了我们的回滚逻辑。
  • 第一步失败测试: 验证了如果Saga在初始阶段就失败,不会留下任何中间状态。

这种基于故障注入的单元/集成测试方法,将原本难以捉摸的分布式系统行为,转化为了可以在CI/CD流水线中反复运行、稳定验证的确定性测试用例。

局限性与未来迭代路径

我们构建的这个内存Saga编排器在许多场景下已经足够有用,但它并非银弹。一个务实的工程师必须清楚其边界。

首先,编排器本身是单点故障。如果在 action 执行成功后、将该步骤加入 completedSteps 数组前,或者在启动 rollback 流程时,编排器进程崩溃,Saga的状态就会丢失,导致系统处于不一致状态且无法自动恢复。在对数据一致性要求极高的场景下,需要一个持久化的Saga日志。每次Saga状态变更(步骤开始、步骤成功、开始回滚等)都应先写入日志(例如,存入另一个高可用的数据库或Delta表),然后再执行操作。这样即使编排器重启,也能从日志中恢复Saga的上下文并继续执行或回滚。

其次,补偿操作失败的处理。当前实现只是记录了一个严重错误。在生产系统中,这应该触发一个告警,进入一个“死信队列”,并由运维人员或自动化的修复脚本进行干预。设计补偿操作时,必须尽最大努力使其成功,例如通过无限重试、采用更底层的API等。

最后,这个实现是编排(Orchestration)模式,还有一个对等的模式叫做协同(Choreography),服务通过监听彼此发布的事件来触发自己的本地事务或补偿操作。协同模式解耦度更高,但缺点是整个业务流程不清晰,难以监控和调试。我们选择的编排模式,虽然引入了编排器这个中心点,但换来的是对复杂流程的显式控制和更强的可测试性。


  目录