MyBatis流式查询与Dask并行计算的异构集成方案


一、 定义问题:跨越语言边界的TB级数据计算瓶颈

在维护一个成熟的Java单体应用时,我们面临一个典型的两难困境。该应用的核心是一个高度规范化的PostgreSQL数据库,数据量已达TB级别,承载着所有在线事务处理(OLTP)负载。另一方面,数据科学团队(使用Python技术栈)需要对这些数据进行复杂的、计算密集型的分析,例如用户行为建模、风险评估等。

直接在生产数据库上运行这些分析查询是不可行的。这些查询通常涉及全表扫描、复杂的JOIN和聚合,会消耗大量CPU和IO资源,产生巨量锁竞争,足以拖垮整个在线服务。

二、 方案A:内存加载与分析(Pandas/JDBC)

最初的尝试是简单直接的:通过JDBC连接,使用MyBatis执行一个查询,将结果集一次性拉取到Python端,再加载到Pandas DataFrame中进行分析。

优势

  • 实现简单:几行代码就能建立连接并获取数据。
  • 生态成熟JayDeBeApiPyJDBC等库可以轻易地在Python和Java数据库驱动之间建立桥梁。

劣势

这是一个在小数据集上看似美好,但在生产规模下必然失败的方案。

  1. 内存溢出(OOM):TB级的数据表,哪怕只是查询其中一个千万行级别的大表,所需内存也远超单个分析节点的承载能力。Pandas将整个数据集加载到内存中,这是其核心瓶颈。
  2. 数据库压力:一个SELECT * FROM large_table查询会强制数据库将数亿行数据一次性读入内存并推向网络缓冲区,这对数据库的内存和IO是毁灭性的。
  3. 阻塞式IO:Python进程会完全阻塞,直到JDBC驱动接收完所有数据。对于一个可能耗时数十分钟的查询,这意味着资源的巨大浪费和极差的响应性。

在真实项目中,这种方案在数据量超过百万行时便已举步维艰,对于我们的目标规模,它从一开始就被排除了。

三、 方案B:离线数据转储(ETL)

既然在线查询不可行,很自然地会想到将数据离线化。即编写一个定时任务,将所需数据从生产库导出为Parquet或CSV文件,存放在对象存储(如S3)或HDFS上。然后,Dask集群从这些文件中读取数据进行并行处理。

优势

  • 读写分离:分析负载与生产数据库完全解耦,保障了在线服务的稳定性。
  • 并行处理友好:Dask天生擅长处理分区文件。将大表导出为多个Parquet文件,Dask可以高效地并行读取和处理。
  • 持久化:数据转储后可被多次复用,用于不同的分析任务。

劣势

这个方案看似解决了核心问题,但在我们的场景中引入了新的、无法接受的复杂性与局限。

  1. 数据时效性差:ETL任务通常是按小时或天为周期执行。对于需要近实时数据的风险分析场景,小时级的数据延迟是致命的。
  2. 存储成本与管理复杂性:TB级的原始数据每天生成增量快照,存储成本会迅速膨胀。数据版本管理、生命周期策略、权限控制等都成为新的运维负担。
  3. “导出”过程本身就是一种冲击:在业务高峰期执行大规模数据导出,本质上也是一次对数据库的重度读取操作。虽然比分析查询要简单,但依然会抢占宝贵的IO资源。
  4. ETL管道脆弱性:需要维护一套独立的ETL调度系统(如Airflow),处理任务依赖、失败重试、数据质量校验等,这本身就是一个复杂的工程领域。

在权衡后,我们认为ETL方案虽然可行,但其引入的延迟和运维复杂性与我们的业务需求不符。我们需要一个既能隔离负载,又能提供较高数据时效性的方案。

四、 最终选择:基于gRPC流的在线数据管道

我们的最终决策是构建一个专用的数据服务层,它作为生产数据和分析集群之间的桥梁。这个服务将以流式(Streaming)的方式提供数据,从而在源头解决内存瓶颈。

核心架构:
一个Java gRPC服务,内嵌MyBatis,负责与数据库交互。它暴露一个服务器流式RPC接口。Python端的Dask集群作为gRPC客户端,连接此服务,消费数据流,并将其分发给Dask workers进行并行计算。

graph TD
    subgraph Python/Dask Cluster
        Dask_Client[Dask Client Application] --> Dask_Scheduler[Dask Scheduler]
        Dask_Scheduler -- "Distributes tasks" --> Dask_Worker_1[Dask Worker 1]
        Dask_Scheduler -- "Distributes tasks" --> Dask_Worker_2[Dask Worker 2]
        Dask_Scheduler -- "Distributes tasks" --> Dask_Worker_N[Dask Worker N]
    end

    subgraph gRPC Data Bridge
        Dask_Worker_1 -- "gRPC Stream Request" --> gRPC_Client_Stub_1
        Dask_Worker_2 -- "gRPC Stream Request" --> gRPC_Client_Stub_2
        Dask_Worker_N -- "gRPC Stream Request" --> gRPC_Client_Stub_N

        gRPC_Client_Stub_1 & gRPC_Client_Stub_2 & gRPC_Client_Stub_N -- "TCP Connection" --> Java_gRPC_Server[Java gRPC Server]
    end
    
    subgraph Java Data Service
        Java_gRPC_Server -- "Invokes service logic" --> Data_Service_Layer[Data Service Layer]
        Data_Service_Layer -- "Executes query with ResultHandler" --> MyBatis_Mapper[MyBatis Mapper]
        MyBatis_Mapper -- "JDBC Streaming" --> PostgreSQL_DB[(Production PostgreSQL DB)]
    end

    style Dask_Client fill:#b9f,stroke:#333,stroke-width:2px
    style Java_gRPC_Server fill:#f96,stroke:#333,stroke-width:2px

选择理由:

  1. 内存效率:这是最关键的优势。通过MyBatis的ResultHandler和JDBC的游标(cursor),数据在Java服务侧是逐条从数据库读取并处理的,服务本身几乎不消耗内存。数据被立即写入gRPC流,发送给客户端。Dask客户端同样是逐条接收,并立即分发,避免了在任何单点上加载整个数据集。
  2. 受控的数据库访问:MyBatis提供了对SQL的精细化控制。我们可以编写高度优化的查询,并设置fetchSize参数,精确控制每次网络往返从数据库拉取的数据行数,将对生产库的影响降至最低。
  3. 近实时性:数据直接从生产库流出,消除了ETL方案的延迟。
  4. 跨语言的强类型契约:gRPC使用Protocol Buffers定义服务接口和数据结构,确保了Java服务端和Python客户端之间严格的类型安全和清晰的API契约。
  5. 内置反压机制:gRPC的HTTP/2流控机制天然提供了反压(Backpressure)。如果Dask端处理速度跟不上,gRPC流会自动减慢数据发送速度,防止消费者被压垮,并最终将压力传导回数据库游标的读取,形成一个优雅的端到端流控。

五、 核心实现概览

1. Protocol Buffers 定义

这是我们数据契约的基石。定义一个服务DataStreamer和一个流式方法streamRecords,以及数据记录的结构TransactionRecord

// src/main/proto/data_streamer.proto
syntax = "proto3";

package com.mycompany.data.streamer;

option java_multiple_files = true;
option java_package = "com.mycompany.data.streamer.grpc";

import "google/protobuf/timestamp.proto";

// 数据流服务定义
service DataStreamer {
  // 服务端流式RPC,用于传输交易记录
  // 客户端发送一个请求,服务端以流的形式返回多条记录
  rpc streamTransactions(StreamRequest) returns (stream TransactionRecord) {}
}

// 流式请求参数
message StreamRequest {
  // 可以用来传递查询参数,如日期范围等
  string start_date = 1;
  string end_date = 2;
  int32 batch_size = 3; // 示例:客户端可以建议批次大小
}

// 单条交易记录的数据结构
message TransactionRecord {
  string transaction_id = 1;
  string user_id = 2;
  double amount = 3;
  string currency = 4;
  google.protobuf.Timestamp transaction_time = 5;
  string status = 6;
}

2. Java gRPC 服务端与 MyBatis ResultHandler

这是将数据库记录转化为gRPC流的核心。

Maven 依赖 (pom.xml):

<dependencies>
    <!-- MyBatis -->
    <dependency>
        <groupId>org.mybatis</groupId>
        <artifactId>mybatis</artifactId>
        <version>3.5.11</version>
    </dependency>
    <!-- PostgreSQL Driver -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.5.1</version>
    </dependency>
    <!-- gRPC -->
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty-shaded</artifactId>
        <version>1.50.2</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-protobuf</artifactId>
        <version>1.50.2</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-stub</artifactId>
        <version>1.50.2</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.5</version>
    </dependency>
</dependencies>

MyBatis Mapper 接口:
注意方法的第二个参数是ResultHandler。MyBatis在执行此方法时,不会将结果集加载到List中,而是每获取到一条记录,就调用一次ResultHandlerhandleResult方法。

// com.mycompany.data.mapper.TransactionMapper.java
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.session.ResultHandler;
import com.mycompany.data.model.Transaction;

public interface TransactionMapper {
    /**
     * 流式查询交易记录
     * @param handler 用于处理每一条结果记录的回调处理器
     */
    void streamAllTransactions(ResultHandler<Transaction> handler);
}

Mapper XML 文件:
关键在于fetchSize="-2147483648" (即 Integer.MIN_VALUE)。这对PostgreSQL JDBC驱动是一个特殊信号,告知它启用游标模式(cursor-based streaming),一次只从数据库获取少量行,而不是将整个结果集缓存到客户端内存。

<!-- com/mycompany/data/mapper/TransactionMapper.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.mycompany.data.mapper.TransactionMapper">
    <select id="streamAllTransactions" resultType="com.mycompany.data.model.Transaction" fetchSize="-2147483648">
        SELECT transaction_id, user_id, amount, currency, transaction_time, status
        FROM transactions
        -- 在真实项目中,这里应该有WHERE子句和ORDER BY
    </select>
</mapper>

gRPC 服务实现:

// com.mycompany.data.streamer.service.DataStreamerImpl.java
import com.mycompany.data.mapper.TransactionMapper;
import com.mycompany.data.model.Transaction;
import com.mycompany.data.streamer.grpc.*;
import io.grpc.stub.StreamObserver;
import org.apache.ibatis.session.ResultContext;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import com.google.protobuf.Timestamp;
import java.time.ZoneOffset;

public class DataStreamerImpl extends DataStreamerGrpc.DataStreamerImplBase {

    private final SqlSessionFactory sqlSessionFactory;

    public DataStreamerImpl(SqlSessionFactory sqlSessionFactory) {
        this.sqlSessionFactory = sqlSessionFactory;
    }

    @Override
    public void streamTransactions(StreamRequest request, StreamObserver<TransactionRecord> responseObserver) {
        try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
            TransactionMapper mapper = sqlSession.getMapper(TransactionMapper.class);
            
            // 这是关键部分:将gRPC的StreamObserver包装在MyBatis的ResultHandler中
            mapper.streamAllTransactions(new ResultHandler<Transaction>() {
                @Override
                public void handleResult(ResultContext<? extends Transaction> resultContext) {
                    Transaction transaction = resultContext.getResultObject();
                    
                    // 将领域模型对象转换为gRPC消息对象
                    TransactionRecord record = TransactionRecord.newBuilder()
                        .setTransactionId(transaction.getTransactionId())
                        .setUserId(transaction.getUserId())
                        .setAmount(transaction.getAmount())
                        .setCurrency(transaction.getCurrency())
                        .setTransactionTime(Timestamp.newBuilder()
                            .setSeconds(transaction.getTransactionTime().toEpochSecond(ZoneOffset.UTC))
                            .build())
                        .setStatus(transaction.getStatus())
                        .build();

                    // 将记录发送到流中
                    responseObserver.onNext(record);
                }
            });
            
            // 所有记录处理完毕,通知客户端流结束
            responseObserver.onCompleted();
        } catch (Exception e) {
            // 出现错误,通知客户端
            responseObserver.onError(e);
        }
    }
}

注意:在生产级代码中,需要更复杂的gRPC流控处理。例如,检查ServerCallStreamObserver.isReady()并在其返回false时暂停从数据库读取,在其onReady回调中恢复。但对于多数JDBC驱动,fetchSize已经提供了底层的缓冲和流控。

3. Python Dask 客户端

客户端负责消费流,并将其无缝地转换为Dask可以并行处理的数据结构。

Python 依赖 (requirements.txt):

dask[complete]==2022.12.1
grpcio==1.50.0
grpcio-tools==1.50.0
protobuf==4.21.9
pandas==1.5.2

客户端与Dask集成代码:

import grpc
import dask
import dask.bag as db
from dask.distributed import Client, LocalCluster
import pandas as pd

# 导入从 .proto 文件生成的代码
import data_streamer_pb2
import data_streamer_pb2_grpc

# 常量
GRPC_SERVER_ADDRESS = 'localhost:50051'

def record_stream_generator(stub):
    """
    一个生成器函数,它调用gRPC流式方法并逐一yield记录。
    这是连接 Dask 和 gRPC 流的关键。
    """
    try:
        # 创建一个请求对象
        request = data_streamer_pb2.StreamRequest(batch_size=1000)
        
        # 调用流式RPC,它返回一个迭代器
        response_iterator = stub.streamTransactions(request)
        
        print("Client: Starting to receive stream...")
        count = 0
        for record in response_iterator:
            # 将Protobuf对象转换为更Pythonic的字典或元组
            yield {
                'transaction_id': record.transaction_id,
                'user_id': record.user_id,
                'amount': record.amount,
                'currency': record.currency,
                'transaction_time': pd.to_datetime(record.transaction_time.seconds, unit='s'),
                'status': record.status,
            }
            count += 1
            if count % 10000 == 0:
                print(f"Client: Received {count} records...")

        print(f"Client: Stream finished. Total records: {count}")

    except grpc.RpcError as e:
        print(f"An RPC error occurred: {e.status()}: {e.details()}")
        # 在生产环境中,这里应该有更健壮的错误处理和重试逻辑

def main():
    """
    主执行函数:设置Dask集群,消费gRPC流,并执行并行计算。
    """
    # 1. 设置 Dask 集群 (本地测试)
    # 在生产环境中,这里会连接到一个远程的Dask集群
    cluster = LocalCluster(n_workers=4, threads_per_worker=2, memory_limit='2GB')
    client = Client(cluster)
    print(f"Dask Dashboard is running at: {client.dashboard_link}")

    # 2. 创建 gRPC 通道和存根
    channel = grpc.insecure_channel(GRPC_SERVER_ADDRESS)
    stub = data_streamer_pb2_grpc.DataStreamerStub(channel)

    # 3. 将gRPC流生成器包装成Dask Bag
    # dask.bag.from_sequence 会惰性地从生成器中拉取数据,
    # 并将数据分成多个分区,分发给不同的worker。
    # 这是实现并行处理的核心步骤。
    # npartitions 参数建议Dask将数据流切分成多少个块来并行处理。
    b = db.from_sequence(record_stream_generator(stub), npartitions=16)

    # 4. 定义并执行一个非平凡的Dask计算
    # 例如:计算每个用户的总交易额和平均交易额
    
    # 按 user_id 分组
    user_groups = b.group_by(lambda record: record['user_id'])

    # 定义一个聚合函数
    def aggregate_user_data(group):
        user_id, records = group
        record_list = list(records)
        total_amount = sum(r['amount'] for r in record_list)
        count = len(record_list)
        avg_amount = total_amount / count if count > 0 else 0
        return {
            'user_id': user_id,
            'total_amount': total_amount,
            'transaction_count': count,
            'avg_amount': avg_amount
        }

    # 在每个分区上并行应用聚合函数
    user_analytics = user_groups.map(aggregate_user_data)

    # 5. 触发计算并获取结果
    # .compute() 会阻塞直到整个计算图执行完毕
    print("Dask: Starting computation...")
    results = user_analytics.compute()
    print("Dask: Computation finished.")

    # 打印前10条结果
    print("Top 10 user analytics results:")
    print(results[:10])

    # 6. 关闭资源
    client.close()
    cluster.close()
    channel.close()


if __name__ == '__main__':
    main()

六、 架构的扩展性与局限性

该方案成功地解决了我们最初面临的核心问题,即在不影响生产系统稳定性和不引入高昂ETL成本的前提下,对大规模在线数据进行复杂的并行计算。

扩展性:

  • 多数据源:可以轻易地在Java服务中增加更多的Mapper和gRPC方法,以支持从不同的表或甚至不同的数据库中流式传输数据。
  • 动态查询StreamRequest可以被扩展,以接受更复杂的参数,例如动态的WHERE条件或字段列表,从而让Python端拥有更大的查询灵活性,尽管这会增加服务端的复杂性和安全风险。
  • 服务化:这个数据流服务本身可以被公司内部其他需要访问这部分数据的系统复用,成为一个标准化的数据出口。

局限性:

  • 序列化开销:gRPC的Protobuf序列化/反序列化虽然高效,但对于海量数据流,其CPU开销仍然是存在的。与直接内存访问或二进制文件读取相比,这是一个额外的性能成本。
  • 查询逻辑中心化:所有的SQL逻辑都被封装在Java服务中。这对于数据治理和性能控制是优点,但对于需要进行探索性数据分析(Ad-hoc Query)的数据科学家来说,这是一个巨大的限制。他们无法自由地编写和测试新的查询。
  • 网络瓶颈:整个方案的吞吐量上限受限于Java服务与Dask集群之间的网络带宽和延迟。
  • 适用边界:此架构并非数据仓库或数据湖的替代品。它最适用于目标明确、查询模式相对固定的计算密集型任务,而不是通用的商业智能(BI)或数据探索场景。对于后者,传统的ETL到数据仓库的模式仍然是更合适的选择。

一个可能的未来迭代方向是,探索一种机制,允许客户端安全地传递某种形式的抽象查询树(AST),由服务端解析并转换为安全的SQL,从而在灵活性和安全性之间找到更好的平衡点。


  目录