团队接手了一个遗留的数据分析项目,痛点非常明确:每个月,分析师需要手动运行一个本地的Python脚本,处理大约500万行从业务系统导出的CSV文件。这个脚本使用Pandas进行一系列复杂的特征衍生和数据清洗,最终将结果写入HBase,用作后续模型训练和在线查询的特征源。整个过程充满了不确定性:脚本执行时间长达数小时,中途一旦网络抖动或HBase集群有短暂波动,整个任务就得从头再来。更糟糕的是,没人能说清失败时到底有多少数据被写入了,数据一致性全靠手动检查和祈祷。
这种刀耕火种的作业方式在生产环境中是不可接受的。我们的首要任务,就是将这个脆弱的手动流程改造为一个稳定、自动化、可重复执行的ETL管道。目标很简单:分析师只需要将新的源CSV文件提交到Git仓库的特定目录,CI/CD系统就应该自动触发整个处理和加载流程,并确保无论流程成功、失败还是重跑,HBase中的数据状态都是最终一致的。
我们的技术栈选型很直接:
- CircleCI: 作为CI/CD的执行引擎。它的配置相对直观,对Docker的支持也很好,便于我们构建一个包含所有依赖的、干净的执行环境。
- Pandas: 核心的数据处理库,这是既定事实,我们要做的是优化其用法,而不是替换它。
- HBase: 目标数据存储,我们需要通过Python与其高效、稳定地交互。
核心挑战不在于打通这几个工具,而在于如何设计整个流程的幂等性和原子性。幂等性意味着对同一个输入数据运行管道一次和运行多次,最终系统的状态应该是完全相同的。原子性则要求我们尽量减少部分成功、部分失败的中间状态。
第一步:重构数据处理脚本,注入幂等性
原始脚本是一个巨大的单体函数,混合了文件读取、数据转换和数据库写入。第一步就是解耦。我们将它拆分为三个核心部分:数据提取(extract
)、数据转换(transform
)和数据加载(load
)。
关键在于load
阶段的设计。向HBase写入数据,为了性能,必须使用批量操作(Batching)。happybase
库提供了Batch
对象,可以显著减少网络往返。但如果一个批次在提交过程中失败,一部分数据可能已经写入成功。这就是问题的根源。
我们的解决方案是围绕HBase的RowKey设计。HBase是基于RowKey进行存储和检索的,一个设计良好的RowKey是实现幂等性的关键。我们决定使用源数据中具有唯一业务标识的字段(例如 user_id
和 event_timestamp
的组合)进行SHA-256哈希,生成一个确定性的、固定长度的RowKey。
# scripts/process_and_load.py
import os
import sys
import logging
import hashlib
import pandas as pd
import happybase
# --- Configuration ---
# 在真实项目中,这些配置应来自环境变量
HBASE_HOST = os.getenv('HBASE_HOST', 'localhost')
HBASE_PORT = int(os.getenv('HBASE_PORT', 9090))
HBASE_TABLE = os.getenv('HBASE_TABLE', 'user_features')
SOURCE_DATA_PATH = os.getenv('SOURCE_DATA_PATH', 'data/source.csv')
BATCH_SIZE = int(os.getenv('BATCH_SIZE', 1000))
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
def generate_row_key(user_id: str, event_ts: int) -> str:
"""
根据业务唯一键生成确定性的HBase RowKey。
这是实现幂等性的核心:同样的输入永远产生同样的RowKey。
"""
key_str = f"{user_id}-{event_ts}"
return hashlib.sha256(key_str.encode('utf-8')).hexdigest()
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""
执行复杂的数据转换。
这里的逻辑是业务核心,也是最需要被测试的部分。
"""
logging.info(f"开始数据转换,原始数据行数: {len(df)}")
# 1. 数据清洗:处理缺失值
df['login_channel'].fillna('UNKNOWN', inplace=True)
df['user_age'].fillna(df['user_age'].median(), inplace=True)
# 2. 类型转换,处理潜在的脏数据
df['user_age'] = pd.to_numeric(df['user_age'], errors='coerce').astype('Int64')
df['event_timestamp'] = pd.to_numeric(df['event_timestamp'], errors='coerce')
df.dropna(subset=['user_id', 'event_timestamp', 'user_age'], inplace=True)
# 3. 特征衍生:基于现有数据创建新特征
df['is_weekend'] = pd.to_datetime(df['event_timestamp'], unit='s').dt.dayofweek >= 5
df['login_device_type'] = df['device_id'].apply(lambda x: 'MOBILE' if str(x).startswith('m_') else 'PC')
# 4. 生成RowKey
df['row_key'] = df.apply(lambda row: generate_row_key(row['user_id'], row['event_timestamp']), axis=1)
logging.info(f"数据转换完成,有效数据行数: {len(df)}")
return df
def load_to_hbase(df: pd.DataFrame):
"""
将处理后的DataFrame批量加载到HBase。
"""
try:
connection = happybase.Connection(host=HBASE_HOST, port=HBASE_PORT, timeout=20000)
table = connection.table(HBASE_TABLE)
logging.info(f"成功连接到HBase: {HBASE_HOST}:{HBASE_PORT}, 表: {HBASE_TABLE}")
except Exception as e:
logging.error(f"连接HBase失败: {e}")
# 在CI环境中,连接失败应该直接导致任务失败
sys.exit(1)
total_rows = len(df)
processed_count = 0
with table.batch(batch_size=BATCH_SIZE) as b:
logging.info(f"开始批量写入数据,批次大小: {BATCH_SIZE}")
for index, row in df.iterrows():
try:
row_key = row['row_key']
# 构建写入HBase的数据结构
# 列族 'profile' 和 'metrics' 必须预先在HBase中创建好
# 所有值都必须是字符串或字节
hbase_data = {
b'profile:user_id': str(row['user_id']).encode('utf-8'),
b'profile:user_age': str(row['user_age']).encode('utf-8'),
b'profile:login_channel': str(row['login_channel']).encode('utf-8'),
b'metrics:is_weekend': str(row['is_weekend']).encode('utf-8'),
b'metrics:device_type': str(row['login_device_type']).encode('utf-8'),
b'metrics:event_ts': str(row['event_timestamp']).encode('utf-8')
}
b.put(row_key.encode('utf-8'), hbase_data)
processed_count += 1
if processed_count % (BATCH_SIZE * 10) == 0:
logging.info(f"已处理 {processed_count}/{total_rows} 行数据...")
except Exception as e:
# 这里的错误处理很关键。在一个批次内部,我们选择记录错误并继续
# 这样不会因为少数几行脏数据导致整个任务失败。
logging.warning(f"处理行 {index} 时发生错误,已跳过。错误: {e}, 数据: {row.to_dict()}")
continue
# batch上下文管理器退出时会自动提交所有批次
connection.close()
logging.info(f"数据加载完成。总计处理 {processed_count}/{total_rows} 行。")
def main():
"""
ETL主流程
"""
logging.info("ETL流程启动...")
try:
df = pd.read_csv(SOURCE_DATA_PATH)
logging.info(f"成功读取源数据: {SOURCE_DATA_PATH}")
except FileNotFoundError:
logging.error(f"源数据文件未找到: {SOURCE_DATA_PATH}")
sys.exit(1)
transformed_df = transform_data(df)
if not transformed_df.empty:
load_to_hbase(transformed_df)
else:
logging.warning("没有有效数据需要加载。")
logging.info("ETL流程执行完毕。")
if __name__ == '__main__':
main()
这个脚本通过确定性RowKey实现了幂等性。无论脚本运行多少次,同一行源数据总是会更新HBase中的同一行。HBase的put
操作本身就是“upsert”(更新或插入),这天然符合我们的需求。
第二步:为数据转换逻辑编写单元测试
在生产环境中,任何没有测试的代码都是一颗定时炸弹。数据处理逻辑尤其如此,一个微小的改动可能导致下游数据质量的巨大灾难。我们将为transform_data
函数编写单元测试,并将其作为CI流程的一部分。
# tests/test_transformations.py
import unittest
import pandas as pd
from pandas.testing import assert_frame_equal
import sys
import os
# 将脚本目录添加到Python路径,以便导入
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'scripts')))
from process_and_load import transform_data
class TestDataTransformations(unittest.TestCase):
def test_full_transformation_logic(self):
"""
测试完整的转换逻辑,包括清洗、类型转换和特征衍生。
"""
# 1. 准备测试输入数据
input_data = {
'user_id': ['u001', 'u002', 'u003', 'u004'],
'event_timestamp': [1678886400, 1678972800, 1678886401, 'invalid_ts'],
'user_age': [25, None, 30, 40],
'login_channel': ['APP', 'WEB', None, 'WECHAT'],
'device_id': ['m_abc', 'd_xyz', 'm_123', 'd_456']
}
input_df = pd.DataFrame(input_data)
# 2. 准备期望的输出数据
# 注意:我们不测试RowKey的具体哈希值,只验证该列是否存在
expected_data = {
'user_id': ['u001', 'u002', 'u003'],
'event_timestamp': [1678886400, 1678972800, 1678886401],
'user_age': pd.Series([25, 30, 30], dtype='Int64'), # None被中位数30填充
'login_channel': ['APP', 'WEB', 'UNKNOWN'], # None被'UNKNOWN'填充
'device_id': ['m_abc', 'd_xyz', 'm_123'],
'is_weekend': [False, True, False],
'login_device_type': ['MOBILE', 'PC', 'MOBILE']
}
expected_df = pd.DataFrame(expected_data)
# 3. 执行待测试函数
actual_df = transform_data(input_df.copy())
# 4. 断言
self.assertTrue('row_key' in actual_df.columns)
self.assertEqual(len(actual_df), 3) # 验证'invalid_ts'的行被删除
# 比较除了row_key之外的所有列
assert_frame_equal(
actual_df.drop(columns=['row_key']).reset_index(drop=True),
expected_df.reset_index(drop=True),
check_dtype=True
)
if __name__ == '__main__':
unittest.main()
第三步:配置CircleCI自动化流程
现在我们有了可执行的脚本和单元测试,是时候让CircleCI把它们串起来了。我们的.circleci/config.yml
文件需要定义一个完整的工作流:
- 环境准备: 使用一个包含Python的Docker镜像。
- 依赖安装: 从
requirements.txt
安装所有Python库。 - 代码测试: 运行我们编写的单元测试。只有测试通过,才能进行下一步。
- 数据加载: 执行主ETL脚本,将数据写入HBase。
一个常见的坑是在CI环境中如何处理HBase这样的外部服务依赖。在真实的大型项目中,CI环境应该能访问一个专用的测试或预发HBase集群。为了演示,我们在这里假设CI的执行器(runner)可以直接访问目标HBase。
另一个关键点是凭证管理。HBase的地址、端口等敏感信息绝不能硬编码在代码或配置文件中。CircleCI提供了Contexts
或Project Environment Variables
来安全地存储这些信息。
# .circleci/config.yml
version: 2.1
orbs:
python: circleci/[email protected]
jobs:
build-and-test:
docker:
- image: cimg/python:3.9
steps:
- checkout
- python/install-packages:
pkg-manager: pip
# 我们将在这个文件中列出 pandas, happybase, pytest 等
pip-dependency-file: requirements.txt
- run:
name: "Run Unit Tests for Data Transformation"
command: |
echo "Running data logic tests..."
pytest tests/
run-etl-to-hbase:
docker:
- image: cimg/python:3.9
# 这里的资源类可以根据数据量调整,数据量大时需要更多内存
resource_class: medium
environment:
# 这些环境变量将在CircleCI UI中配置
# HBASE_HOST: hbase.internal.mycompany.com
# HBASE_PORT: 9090
# HBASE_TABLE: user_features
SOURCE_DATA_PATH: data/latest_batch.csv
steps:
- checkout
- python/install-packages:
pkg-manager: pip
pip-dependency-file: requirements.txt
- run:
name: "Execute HBase Data Loading Script"
# 增加超时,因为数据加载可能很耗时
no_output_timeout: 30m
command: |
echo "Starting ETL process..."
python scripts/process_and_load.py
workflows:
version: 2
monthly-data-pipeline:
jobs:
- build-and-test
- run-etl-to-hbase:
requires:
- build-and-test
filters:
branches:
# 这个工作流只在main分支上发生变更时触发
only:
- main
这个配置文件定义了一个名为monthly-data-pipeline
的工作流。它首先执行build-and-test
作业。如果测试成功,它才会继续执行run-etl-to-hbase
作业。我们通过requires
关键字建立了这种依赖关系。通过分支过滤器,我们确保了只有合并到main
分支的代码(例如,一个新的数据文件被添加)才会触发生产数据加载。
整个流程的架构图如下:
graph TD A[Developer/Analyst pushes new data file to Git main branch] --> B{CircleCI Trigger}; B --> C[Job: build-and-test]; C --> D{Setup Python Environment}; D --> E[Install Dependencies]; E --> F[Run Pytest on transformation logic]; F -- Success --> G[Job: run-etl-to-hbase]; F -- Failure --> H[Pipeline Fails & Notifies]; G --> I{Setup Python Environment}; I --> J[Install Dependencies]; J --> K[Execute Python Script]; K --> L[Read CSV -> Pandas Transform -> Generate RowKeys]; L --> M[Batch Load to HBase]; M -- Success --> N[Pipeline Succeeds]; M -- Failure --> O[Pipeline Fails & Notifies]; subgraph HBase Cluster P[Table: user_features] end M --> P;
最终成果与局限性
通过这套改造,我们实现了一个健壮的、自动化的ETL管道。分析师的工作流程从“运行一个神秘脚本并祈祷”变成了“将数据文件提交到Git并获得明确的成功或失败反馈”。
- 可靠性: 单元测试保证了核心转换逻辑的质量。CI流程的原子性(测试失败则不加载)防止了有问题的代码污染生产数据。
- 幂等性: 基于确定性RowKey的设计,使得整个加载过程可以安全地重跑,无论是手动触发重试,还是修复bug后重新运行。
- 可观测性: CircleCI的输出提供了详细的执行日志,包括我们脚本中打印的日志。任何错误都可以被快速定位。
当然,这个方案并非完美。它本质上仍然是一个批处理流程,对于实时性要求高的场景并不适用。如果源数据文件变得极其巨大(例如TB级别),单机Pandas处理会遇到内存瓶颈,届时就需要引入Spark等分布式计算框架。此外,对于失败批次的处理,当前只是记录日志并跳过。一个更完善的系统可能会将这些失败的行发送到“死信队列”(Dead Letter Queue),以便后续进行人工排查和重新处理。未来的迭代方向可以是在CI流程中集成数据质量校验工具(如Great Expectations),在加载数据前执行一系列断言,确保数据符合预定义的规则,进一步提升数据管道的健壮性。