一个看似简单的业务需求摆在面前:在一个用户注册流程中,我们需要原子性地向两个不同的Delta Lake表中写入数据。一张是 users
表,记录用户ID和认证信息;另一张是 user_profiles
表,记录用户的详细资料。如果任意一张表写入失败,整个注册操作必须回滚,不能留下任何脏数据。
在单体数据库中,一个 BEGIN TRANSACTION...COMMIT
就能解决。但在数据湖仓架构中,即便Delta Lake为单个表提供了ACID事务保证,跨多个表的原子性操作依然是一个经典的分布式系统难题。
一个直接的想法可能是这样的:
// naive_approach.ts
// 这是一个错误示范,它无法保证原子性
import { DeltaTable, writeDelta } from "@delta-rs/delta-js";
import { table } from "console";
import * as path from "path";
import * as fs from "fs-extra";
import { RecordBatch, makeTable } from "apache-arrow";
const USERS_TABLE_PATH = path.join(__dirname, "../test_tables/users_naive");
const PROFILES_TABLE_PATH = path.join(__dirname, "../test_tables/profiles_naive");
interface User {
id: string;
email: string;
created_at: number;
}
interface Profile {
user_id: string;
full_name: string;
bio: string;
}
// 辅助函数,用于写入数据
async function writeToTable(tablePath: string, data: RecordBatch) {
const table = makeTable({
...data.schema.fields.reduce((acc, field) => ({ ...acc, [field.name]: field.type }), {}),
...Object.fromEntries(data.schema.fields.map((field, i) => [field.name, data.getChildAt(i)])),
});
await writeDelta(tablePath, table, { mode: "append" });
}
async function registerUserNaive(userId: string, email: string, fullName: string, bio: string): Promise<void> {
// 确保目录存在
await fs.ensureDir(USERS_TABLE_PATH);
await fs.ensureDir(PROFILES_TABLE_PATH);
const now = Date.now();
const userRecord = RecordBatch.from({
id: [userId],
email: [email],
created_at: [now],
});
const profileRecord = RecordBatch.from({
user_id: [userId],
full_name: [fullName],
bio: [bio],
});
try {
console.log("Step 1: Writing to users table...");
await writeToTable(USERS_TABLE_PATH, userRecord);
console.log("Step 1: Success.");
// 模拟在第二步操作时发生致命错误
if (fullName.includes("FAIL")) {
throw new Error("Simulated infrastructure failure before writing to profiles table.");
}
console.log("Step 2: Writing to profiles table...");
await writeToTable(PROFILES_TABLE_PATH, profileRecord);
console.log("Step 2: Success.");
} catch (error) {
console.error("Registration failed. Attempting to rollback...", error);
// 这里的回滚逻辑非常脆弱且不可靠
// 如果删除 users 表记录的操作本身也失败了怎么办?
// 如果在删除前进程崩溃了呢?
const table = await DeltaTable.load(USERS_TABLE_PATH);
await table.delete(`id = '${userId}'`); // 尝试补偿
console.log("Rollback attempt finished.");
throw new Error(`Failed to register user: ${error.message}`);
}
}
上述代码的脆弱性显而易见。try...catch
块中的补偿操作(删除已插入的用户记录)本身也可能失败。如果进程在执行补偿逻辑之前就崩溃了,users
表就会留下一条孤儿数据,系统状态进入不一致。这正是CAP理论中一致性(C)与可用性(A)权衡的体现。强一致性的两阶段提交(2PC)协议在这种场景下过于笨重且会严重影响可用性。因此,在真实项目中,我们通常选择最终一致性方案,Saga模式是其中的佼佼者。
Saga模式与可测试的编排器设计
Saga模式的核心思想是将一个大的分布式事务拆分成一系列本地事务。每个本地事务都有一个对应的补偿(Compensating)操作。当Saga中的任何一个步骤失败时,系统会按相反的顺序依次调用已成功步骤的补偿操作,从而使系统状态回滚到初始状态。
要实现一个健壮的Saga,我们需要一个编排器(Orchestrator)。这个编排器负责按顺序执行每个步骤,并在失败时调用补偿逻辑。为了解决文章开头的问题,我们将用TypeScript设计并实现一个通用的、更重要的是——可单元测试的Saga编排器。
// src/saga_orchestrator.ts
// 定义Saga的每个步骤
export interface SagaStep<T> {
name: string;
// 执行正向操作,需要是幂等的
action: (payload: T) => Promise<void>;
// 执行补偿操作,必须是幂等的
compensate: (payload: T) => Promise<void>;
}
// 定义编排器的日志接口,便于测试和监控
export interface SagaLogger {
info(message: string): void;
error(message: string, error?: any): void;
}
export class SagaOrchestrator<T> {
private steps: SagaStep<T>[] = [];
private logger: SagaLogger;
constructor(logger: SagaLogger = console) {
this.logger = logger;
}
// 添加一个Saga步骤
addStep(step: SagaStep<T>): this {
this.steps.push(step);
return this;
}
// 执行Saga
public async execute(payload: T): Promise<void> {
const completedSteps: SagaStep<T>[] = [];
for (const step of this.steps) {
try {
this.logger.info(`[Saga] Executing step: ${step.name}`);
await step.action(payload);
completedSteps.push(step);
this.logger.info(`[Saga] Step successful: ${step.name}`);
} catch (error) {
this.logger.error(`[Saga] Step failed: ${step.name}`, error);
await this.rollback(completedSteps, payload);
// 重新抛出错误,让调用方知道Saga执行失败
throw new Error(`Saga execution failed at step: ${step.name}`);
}
}
}
// 执行回滚
private async rollback(completedSteps: SagaStep<T>[], payload: T): Promise<void> {
this.logger.info("[Saga] Starting rollback...");
// 按相反顺序执行补偿操作
for (const step of [...completedSteps].reverse()) {
try {
this.logger.info(`[Saga] Compensating for step: ${step.name}`);
await step.compensate(payload);
this.logger.info(`[Saga] Compensation successful for step: ${step.name}`);
} catch (compensationError) {
// 这里的坑在于:如果补偿操作失败,系统就进入了一个需要人工干预的危险状态。
// 在生产环境中,这里需要发出严重告警,并记录详细的上下文信息。
this.logger.error(
`[Saga] CRITICAL: Compensation for step ${step.name} failed. Manual intervention required.`,
compensationError
);
// 尽管补偿失败,我们依然需要继续尝试回滚其他步骤
}
}
this.logger.info("[Saga] Rollback finished.");
}
}
这个编排器的设计有几个关键点:
- 通用性: 它通过泛型
T
接受任意类型的载荷(payload),不与具体业务逻辑耦合。 - 原子性步骤:
SagaStep
接口清晰地定义了action
和compensate
,强制开发者思考正向和逆向操作。 - 状态追踪:
completedSteps
数组是Saga执行状态的内存快照,用于驱动回滚。 - 日志: 注入一个
SagaLogger
接口,这在调试和测试时至关重要。我们可以传入console
,也可以在测试中传入一个mock logger来断言日志行为。 - 健壮的回滚:
rollback
方法即使在某个补偿操作失败时也会继续尝试其他的,并记录严重错误。这是务实的设计,因为停止回滚可能会让系统状态更糟。
将Saga应用于Delta Lake的用户注册场景
现在,我们用这个编排器来重构之前的用户注册逻辑。
// src/user_registration_saga.ts
import { DeltaTable, writeDelta } from "@delta-rs/delta-js";
import { RecordBatch } from "apache-arrow";
import { SagaOrchestrator, SagaStep } from "./saga_orchestrator";
import { makeTable } from "apache-arrow";
// 定义Saga的载荷
export interface UserRegistrationPayload {
userId: string;
email: string;
fullName: string;
bio: string;
// 注入表路径,便于测试隔离
usersTablePath: string;
profilesTablePath: string;
// 用于在测试中注入失败
simulateFailureInStep?: string;
}
// 辅助函数,封装Delta Lake写入逻辑
async function appendToDeltaTable(tablePath: string, data: any[]) {
if (data.length === 0) return;
const arrowTable = makeTable(data);
await writeDelta(tablePath, arrowTable, { mode: "append" });
}
// 第一个步骤:创建用户
const createUsersStep: SagaStep<UserRegistrationPayload> = {
name: "CreateUserRecord",
action: async (payload) => {
const userRecord = [{
id: payload.userId,
email: payload.email,
created_at: Date.now(),
}];
await appendToDeltaTable(payload.usersTablePath, userRecord);
if (payload.simulateFailureInStep === "CreateUserRecord") {
throw new Error("Simulated failure in CreateUserRecord action");
}
},
// 补偿操作:删除用户
compensate: async (payload) => {
// 这里的删除操作需要是幂等的。即使因为重试执行多次,结果也应相同。
const table = await DeltaTable.load(payload.usersTablePath);
await table.delete(`id = '${payload.userId}'`);
},
};
// 第二个步骤:创建用户资料
const createProfileStep: SagaStep<UserRegistrationPayload> = {
name: "CreateProfileRecord",
action: async (payload) => {
if (payload.simulateFailureInStep === "CreateProfileRecord") {
throw new Error("Simulated failure in CreateProfileRecord action");
}
const profileRecord = [{
user_id: payload.userId,
full_name: payload.fullName,
bio: payload.bio,
}];
await appendToDeltaTable(payload.profilesTablePath, profileRecord);
},
// 补偿操作:删除用户资料
compensate: async (payload) => {
const table = await DeltaTable.load(payload.profilesTablePath);
await table.delete(`user_id = '${payload.userId}'`);
},
};
// 创建并配置Saga
export function createUserRegistrationSaga(logger?: any): SagaOrchestrator<UserRegistrationPayload> {
const saga = new SagaOrchestrator<UserRegistrationPayload>(logger);
saga.addStep(createUsersStep).addStep(createProfileStep);
return saga;
}
通过这种方式,业务逻辑被清晰地拆分到各个SagaStep
中,主流程代码只需配置和执行Saga。simulateFailureInStep
属性是为单元测试埋下的钩子,它允许我们在特定步骤精确地注入故障。
sequenceDiagram participant Client participant Orchestrator participant CreateUserAction participant CreateUserCompensate participant CreateProfileAction participant CreateProfileCompensate Client->>+Orchestrator: execute(payload) Orchestrator->>+CreateUserAction: action(payload) CreateUserAction-->>-Orchestrator: success Orchestrator->>+CreateProfileAction: action(payload) Note right of CreateProfileAction: 发生致命错误! CreateProfileAction-->>-Orchestrator: failure Orchestrator->>+CreateUserCompensate: compensate(payload) Note left of Orchestrator: 开始回滚 CreateUserCompensate-->>-Orchestrator: success Orchestrator-->>-Client: Saga Failed Error
使用Jest进行确定性单元测试
没有经过严格测试的分布式逻辑是不可信的。Saga模式最大的挑战之一就是如何有效地测试其补偿路径。借助Jest的mocking和spy能力,我们可以不依赖真实的延时或网络抖动,创建出确定性的、可重复的失败场景测试。
这是我们的测试文件结构和准备工作:
/
├── src/
│ ├── saga_orchestrator.ts
│ └── user_registration_saga.ts
├── tests/
│ ├── saga_orchestrator.test.ts
│ └── user_registration.integration.test.ts
├── test_tables/ (测试时动态创建)
└── package.json
package.json
依赖:
{
"devDependencies": {
"@types/jest": "^29.5.12",
"@types/node": "^20.11.24",
"jest": "^29.7.0",
"ts-jest": "^29.1.2",
"typescript": "^5.3.3"
},
"dependencies": {
"@delta-rs/delta-js": "^0.9.0",
"apache-arrow": "^15.0.0",
"fs-extra": "^11.2.0"
}
}
现在,编写集成测试来验证我们的Saga实现。
// tests/user_registration.integration.test.ts
import * as path from "path";
import * as fs from "fs-extra";
import { DeltaTable, DeltaTableJson } from "@delta-rs/delta-js";
import { createUserRegistrationSaga, UserRegistrationPayload } from "../src/user_registration_saga";
const TEST_DIR = path.join(__dirname, "../test_tables");
const USERS_TABLE_PATH = path.join(TEST_DIR, "users");
const PROFILES_TABLE_PATH = path.join(TEST_DIR, "profiles");
// 辅助函数,用于检查Delta表中的记录数
async function getTableCount(tablePath: string, filter?: string): Promise<number> {
if (!await fs.pathExists(path.join(tablePath, "_delta_log"))) {
return 0;
}
const table = await DeltaTable.load(tablePath);
const selection = filter ? table.selectBy({ filter }) : table;
const data = await selection.toArray();
return data.length;
}
describe("UserRegistrationSaga Integration Test", () => {
// 在每个测试用例开始前,清理并重新创建测试表目录
beforeEach(async () => {
await fs.remove(TEST_DIR);
await fs.ensureDir(USERS_TABLE_PATH);
await fs.ensureDir(PROFILES_TABLE_PATH);
});
// 测试结束后清理
afterAll(async () => {
await fs.remove(TEST_DIR);
});
test("should complete successfully when all steps succeed", async () => {
const saga = createUserRegistrationSaga();
const payload: UserRegistrationPayload = {
userId: "user-123",
email: "[email protected]",
fullName: "John Doe",
bio: "A test user.",
usersTablePath: USERS_TABLE_PATH,
profilesTablePath: PROFILES_TABLE_PATH,
};
await saga.execute(payload);
// 断言:两个表都应该有且只有一条记录
const userCount = await getTableCount(USERS_TABLE_PATH, `id = 'user-123'`);
const profileCount = await getTableCount(PROFILES_TABLE_PATH, `user_id = 'user-123'`);
expect(userCount).toBe(1);
expect(profileCount).toBe(1);
});
test("should rollback correctly if the second step (CreateProfile) fails", async () => {
const saga = createUserRegistrationSaga();
const payload: UserRegistrationPayload = {
userId: "user-456",
email: "[email protected]",
fullName: "Jane Fail",
bio: "This will fail.",
usersTablePath: USERS_TABLE_PATH,
profilesTablePath: PROFILES_TABLE_PATH,
// 关键:注入失败
simulateFailureInStep: "CreateProfileRecord",
};
// 断言:Saga执行应该抛出异常
await expect(saga.execute(payload)).rejects.toThrow("Saga execution failed at step: CreateProfileRecord");
// 断言:补偿逻辑应该已经执行,两个表中都不应该有该用户的记录
const userCount = await getTableCount(USERS_TABLE_PATH, `id = 'user-456'`);
const profileCount = await getTableCount(PROFILES_TABLE_PATH, `user_id = 'user-456'`);
expect(userCount).toBe(0);
expect(profileCount).toBe(0);
console.log("Verified that rollback cleans up data from all tables.");
});
test("should not write any data if the first step (CreateUser) fails", async () => {
const saga = createUserRegistrationSaga();
const payload: UserRegistrationPayload = {
userId: "user-789",
email: "[email protected]",
fullName: "Peter Panic",
bio: "Fails at the very beginning.",
usersTablePath: USERS_TABLE_PATH,
profilesTablePath: PROFILES_TABLE_PATH,
simulateFailureInStep: "CreateUserRecord",
};
await expect(saga.execute(payload)).rejects.toThrow("Saga execution failed at step: CreateUserRecord");
// 断言:因为第一步就失败了,没有任何数据应该被写入
const userCount = await getTableCount(USERS_TABLE_PATH);
const profileCount = await getTableCount(PROFILES_TABLE_PATH);
expect(userCount).toBe(0);
expect(profileCount).toBe(0);
});
});
这个测试套件的核心价值在于 simulateFailureInStep
。它让我们能够精确控制哪个环节出错,然后验证系统的最终状态是否符合预期。
- Happy Path 测试: 验证了在一切正常时,数据能被正确写入。
- 第二步失败测试: 这是最关键的测试。它模拟了
CreateProfileRecord
步骤的失败。测试断言不仅Saga会抛出异常,更重要的是,之前成功执行的CreateUserRecord
步骤被正确补偿了(即users
表中的记录被删除了)。这验证了我们的回滚逻辑。 - 第一步失败测试: 验证了如果Saga在初始阶段就失败,不会留下任何中间状态。
这种基于故障注入的单元/集成测试方法,将原本难以捉摸的分布式系统行为,转化为了可以在CI/CD流水线中反复运行、稳定验证的确定性测试用例。
局限性与未来迭代路径
我们构建的这个内存Saga编排器在许多场景下已经足够有用,但它并非银弹。一个务实的工程师必须清楚其边界。
首先,编排器本身是单点故障。如果在 action
执行成功后、将该步骤加入 completedSteps
数组前,或者在启动 rollback
流程时,编排器进程崩溃,Saga的状态就会丢失,导致系统处于不一致状态且无法自动恢复。在对数据一致性要求极高的场景下,需要一个持久化的Saga日志。每次Saga状态变更(步骤开始、步骤成功、开始回滚等)都应先写入日志(例如,存入另一个高可用的数据库或Delta表),然后再执行操作。这样即使编排器重启,也能从日志中恢复Saga的上下文并继续执行或回滚。
其次,补偿操作失败的处理。当前实现只是记录了一个严重错误。在生产系统中,这应该触发一个告警,进入一个“死信队列”,并由运维人员或自动化的修复脚本进行干预。设计补偿操作时,必须尽最大努力使其成功,例如通过无限重试、采用更底层的API等。
最后,这个实现是编排(Orchestration)模式,还有一个对等的模式叫做协同(Choreography),服务通过监听彼此发布的事件来触发自己的本地事务或补偿操作。协同模式解耦度更高,但缺点是整个业务流程不清晰,难以监控和调试。我们选择的编排模式,虽然引入了编排器这个中心点,但换来的是对复杂流程的显式控制和更强的可测试性。