将一个独立的计算机视觉(CV)模型打包成服务并非难事,一个包含 Dockerfile
、requirements.txt
和一个 main.py
的 FastAPI 应用足以应付。然而,当需求演变为构建一个支持多租户、多模型、多版本的平台级服务时,这种简单的脚本化思维会迅速崩溃。代码库会因为混杂了模型加载、数据预处理、权限校验、资源编排和计费逻辑而变得无法维护。
在真实项目中,复杂性来源于不同业务关注点的纠缠。一个模型从研究员的实验品到生产环境中的稳定API,其生命周期中涉及的领域知识截然不同。领域驱动设计(DDD)的核心价值在于,它强制我们识别并隔离这些不同的领域,在各自的限界上下文(Bounded Context)中独立建模和演进。这正是解决平台级模型部署混乱的根本之道。
本文不讨论如何训练一个具体的CV模型,而是聚焦于如何运用DDD思想,设计并实现一个多租户CV模型部署平台的后端骨架。我们将划分出清晰的限界上下文,并用生产级的Python代码展示其核心实现,目标是构建一个逻辑清晰、易于扩展且与具体云服务商实现解耦的系统。
限界上下文的识别与划分
一个模型部署平台至少要处理以下几类问题:
- 谁能操作?—— 身份与租户管理。
- 什么是模型?—— 模型的元数据、版本、存储。
- 如何部署?—— 将模型版本转化为可访问的在线服务。
- 如何调用?—— 提供高性能的推理端点。
这些问题的本质、变化频率和技术要求各不相同。强行将它们塞入一个单体应用,就是技术债的开始。依据DDD,我们将其划分为四个核心的限界上下文:
- 身份与访问上下文 (Identity & Access Context): 负责处理租户(Tenant)、用户(User)和API密钥(ApiKey)。它的模型非常稳定,主要关注认证与授权。
- 模型生命周期上下文 (Model Lifecycle Context): 核心是
模型家族(ModelFamily)
和模型版本(ModelVersion)
这两个聚合根。它关心模型的元数据、验证标准、存储位置(如S3 URI),但不关心它如何被部署或运行。 - 部署编排上下文 (Deployment Orchestration Context): 它的职责是“翻译官”。它接收一个“部署某个模型版本”的业务请求,并将其翻译成对底层云服务(如AWS SageMaker, GCP Vertex AI, หรือ Kubernetes)的具体API调用。它的核心聚合根是
部署(Deployment)
。 - 推理服务上下文 (Inference Serving Context): 这是一个纯粹的“数据平面”组件。它的唯一职责就是根据指令加载指定模型,并以最低延迟处理推理请求。它无状态、高性能,并且对前三个上下文的复杂业务逻辑一无所知。
这四个上下文之间的关系可以通过下面的上下文映射图(Context Map)来表示。
graph TD subgraph Identity & Access Context A[Tenant] B[User] C[API Key] end subgraph Model Lifecycle Context D[ModelFamily] E[ModelVersion] end subgraph Deployment Orchestration Context F[Deployment] G[Endpoint] end subgraph Inference Serving Context H[Inference Worker] end style Inference Serving Context fill:#f9f,stroke:#333,stroke-width:2px User -- Manages --> ModelFamily User -- Requests --> Deployment Deployment -- "Reads Model Info via ACL" --> ModelVersion Deployment -- "Orchestrates Cloud Resources" --> G G -- "Routes Traffic to" --> H H -- "Loads Artifact from S3" --> ModelVersion %% ACL Definition linkStyle 2 stroke-width:2px,stroke:red,fill:none;
这里的关键在于部署编排上下文
与模型生命周期上下文
之间的交互。它们之间不共享数据库,而是通过一个明确定义的防腐层(Anti-Corruption Layer, ACL)进行通信。在我们的实现中,这个ACL就是一个内部API客户端。
核心实现:模型生命周期上下文
这个上下文是所有模型信息的权威来源。它的API相对简单,主要是CRUD操作,但其内部的数据模型和验证逻辑必须非常健壮。
我们使用FastAPI和Pydantic来构建服务,用SQLAlchemy来定义数据模型。
1. 数据模型 (models.py
)
# model_lifecycle_service/models.py
import uuid
from datetime import datetime
from sqlalchemy import (Column, String, DateTime, ForeignKey, JSON,
UniqueConstraint)
from sqlalchemy.orm import relationship, declarative_base
from sqlalchemy.dialects.postgresql import UUID
Base = declarative_base()
class Tenant(Base):
__tablename__ = "tenants"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
class ModelFamily(Base):
"""
聚合根:模型家族。代表一个特定功能的模型,例如“人脸检测模型”。
它包含多个版本。
"""
__tablename__ = "model_families"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False)
name = Column(String, nullable=False)
description = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
versions = relationship("ModelVersion", back_populates="family", cascade="all, delete-orphan")
__table_args__ = (UniqueConstraint('tenant_id', 'name', name='_tenant_model_name_uc'),)
class ModelVersion(Base):
"""
实体:模型版本。一个具体的、不可变的模型文件。
"""
__tablename__ = "model_versions"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
family_id = Column(UUID(as_uuid=True), ForeignKey("model_families.id"), nullable=False)
version_tag = Column(String, nullable=False, default="latest") # e.g., "v1.0.0", "latest"
# 指向云存储中模型文件的URI,例如 s3://my-bucket/models/face-detector/v1.0.0/model.onnx
artifact_uri = Column(String, nullable=False)
# 模型的元数据,例如输入/输出格式,指标等
metadata = Column(JSON)
created_at = Column(DateTime, default=datetime.utcnow)
family = relationship("ModelFamily", back_populates="versions")
__table_args__ = (UniqueConstraint('family_id', 'version_tag', name='_family_version_tag_uc'),)
这里的ModelFamily
是聚合根。它确保了tenant_id
和name
的组合是唯一的,维护了业务不变量。ModelVersion
是其下的一个实体。
2. API接口与服务逻辑 (main.py
)
# model_lifecycle_service/main.py
import os
import uuid
from typing import List
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from .database import get_db, engine
from . import models
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="Model Lifecycle Service")
# --- DTOs (Data Transfer Objects) ---
class ModelVersionCreate(BaseModel):
version_tag: str = "latest"
artifact_uri: str = Field(..., example="s3://my-model-bucket/path/to/model.onnx")
metadata: dict = Field(default_factory=dict)
class ModelVersionResponse(BaseModel):
id: uuid.UUID
version_tag: str
artifact_uri: str
metadata: dict
created_at: str
class Config:
orm_mode = True
class ModelFamilyResponse(BaseModel):
id: uuid.UUID
name: str
description: str
versions: List[ModelVersionResponse]
class Config:
orm_mode = True
# --- API Endpoints ---
@app.post("/tenants/{tenant_id}/models/{model_name}/versions",
response_model=ModelVersionResponse, status_code=201)
def register_new_model_version(
tenant_id: uuid.UUID,
model_name: str,
version_data: ModelVersionCreate,
db: Session = Depends(get_db)
):
"""
注册一个新的模型版本到一个模型家族。如果家族不存在,则创建。
这是一个核心的写操作,必须是事务性的。
"""
# 检查租户是否存在(在真实系统中,这会通过调用Identity服务或共享的租户表)
tenant = db.query(models.Tenant).filter(models.Tenant.id == tenant_id).first()
if not tenant:
raise HTTPException(status_code=404, detail=f"Tenant '{tenant_id}' not found.")
# 查找或创建ModelFamily
family = db.query(models.ModelFamily).filter_by(tenant_id=tenant_id, name=model_name).first()
if not family:
family = models.ModelFamily(tenant_id=tenant_id, name=model_name, description=f"Auto-created family for {model_name}")
db.add(family)
# 必须先flush才能获取family.id
db.flush()
# 检查版本标签是否已存在
existing_version = db.query(models.ModelVersion).filter_by(family_id=family.id, version_tag=version_data.version_tag).first()
if existing_version:
raise HTTPException(status_code=409, detail=f"Version tag '{version_data.version_tag}' already exists for model '{model_name}'.")
# 在真实项目中,这里应该有逻辑去验证artifact_uri的可访问性和模型的有效性
# e.g., check_s3_uri_access(version_data.artifact_uri)
new_version = models.ModelVersion(
family_id=family.id,
**version_data.dict()
)
db.add(new_version)
db.commit()
db.refresh(new_version)
return new_version
@app.get("/tenants/{tenant_id}/models/{model_name}/versions/{version_tag}",
response_model=ModelVersionResponse)
def get_model_version_details(
tenant_id: uuid.UUID,
model_name: str,
version_tag: str,
db: Session = Depends(get_db)
):
"""
供其他服务(如部署编排服务)调用的内部API,用于获取模型制品的详细信息。
这是防腐层(ACL)的一部分。
"""
version = (db.query(models.ModelVersion)
.join(models.ModelFamily)
.filter(models.ModelFamily.tenant_id == tenant_id,
models.ModelFamily.name == model_name,
models.ModelVersion.version_tag == version_tag)
.first())
if not version:
raise HTTPException(status_code=404, detail="Model version not found.")
return version
注意register_new_model_version
这个函数,它封装了核心业务逻辑:确保租户存在、原子性地“查找或创建”模型家族、校验版本唯一性。这是领域服务(Domain Service)的体现。
核心实现:部署编排上下文
这个上下文是系统的“大脑”,负责状态管理和与外部基础设施的交互。它的复杂性在于处理异步操作和状态转换。
1. 数据模型与状态机 (models.py
)
# deployment_orchestration_service/models.py
import uuid
from enum import Enum
from datetime import datetime
from sqlalchemy import (Column, String, DateTime, ForeignKey, JSON,
UniqueConstraint, Enum as SQLAlchemyEnum)
from sqlalchemy.orm import declarative_base
from sqlalchemy.dialects.postgresql import UUID
Base = declarative_base()
class DeploymentStatus(str, Enum):
PENDING = "PENDING"
CREATING = "CREATING" # 正在与云服务商交互
ACTIVE = "ACTIVE" # 部署成功,端点可用
FAILED = "FAILED"
TERMINATING = "TERMINATING"
TERMINATED = "TERMINATED"
class Deployment(Base):
"""
聚合根:部署。代表一个模型版本到在线服务的映射。
"""
__tablename__ = "deployments"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(UUID(as_uuid=True), nullable=False)
# 部署的唯一名称,用于生成访问域名
deployment_name = Column(String, nullable=False)
# 引用模型生命周期上下文中的模型版本ID
model_version_id = Column(UUID(as_uuid=True), nullable=False)
status = Column(SQLAlchemyEnum(DeploymentStatus), nullable=False, default=DeploymentStatus.PENDING)
# 云服务商返回的端点信息
endpoint_details = Column(JSON) # e.g., {"url": "https://...", "cloud_resource_id": "arn:..."}
# 部署配置,例如实例类型、数量等
deployment_config = Column(JSON) # e.g., {"instance_type": "ml.m5.large", "instance_count": 1}
error_message = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (UniqueConstraint('tenant_id', 'deployment_name', name='_tenant_deployment_name_uc'),)
Deployment
聚合根的核心是status
字段。所有的业务操作都围绕这个状态机进行。
2. 服务与云服务适配器 (main.py
和 cloud_adapter.py
)
这里的关键是隔离云服务商的具体实现。我们定义一个抽象的CloudProviderAdapter
接口,然后可以为AWS SageMaker、Kubernetes等提供具体实现。
# deployment_orchestration_service/cloud_adapter.py
from abc import ABC, abstractmethod
from typing import Dict, Any
class CloudDeploymentResult(BaseModel):
status: str # "SUCCESS" or "FAILURE"
endpoint_details: Dict[str, Any] = None
error_message: str = None
class CloudProviderAdapter(ABC):
"""
适配器模式:隔离与特定云服务商的交互。
"""
@abstractmethod
def create_inference_endpoint(self, deployment_name: str, artifact_uri: str, config: Dict[str, Any]) -> CloudDeploymentResult:
"""
创建一个推理端点。这是一个耗时操作,通常是异步的。
在真实实现中,这会触发一个后台任务。
"""
pass
@abstractmethod
def delete_inference_endpoint(self, deployment_name: str, endpoint_details: Dict[str, Any]) -> None:
"""删除推理端点"""
pass
class MockCloudProviderAdapter(CloudProviderAdapter):
"""一个用于测试和演示的伪实现。"""
def create_inference_endpoint(self, deployment_name: str, artifact_uri: str, config: Dict[str, Any]) -> CloudDeploymentResult:
import time
import random
print(f"[MockAdapter] Received request to deploy {artifact_uri} for {deployment_name} with config {config}")
time.sleep(2) # 模拟API调用延迟
if random.random() < 0.1: # 10%的失败率
return CloudDeploymentResult(status="FAILURE", error_message="Mock cloud provider random failure.")
return CloudDeploymentResult(
status="SUCCESS",
endpoint_details={
"url": f"https://{deployment_name}.mock-cloud.com/invocations",
"cloud_resource_id": f"mock-id-{uuid.uuid4()}"
}
)
def delete_inference_endpoint(self, deployment_name: str, endpoint_details: Dict[str, Any]) -> None:
print(f"[MockAdapter] Deleting endpoint {deployment_name} with details {endpoint_details}")
time.sleep(1)
print(f"[MockAdapter] Deletion successful for {deployment_name}")
现在,主服务逻辑可以注入这个适配器,而不需要知道底层的具体实现。
# deployment_orchestration_service/main.py
import uuid
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel
from .acl import ModelLifecycleACL # 这是一个简单的API客户端
from .cloud_adapter import MockCloudProviderAdapter, CloudProviderAdapter
from .database import get_db
from . import models
app = FastAPI(title="Deployment Orchestration Service")
# --- 依赖注入 ---
def get_cloud_adapter() -> CloudProviderAdapter:
# 这里可以根据配置选择不同的实现
return MockCloudProviderAdapter()
def get_model_lifecycle_acl() -> ModelLifecycleACL:
# ACL客户端,负责与模型生命周期服务通信
return ModelLifecycleACL(base_url="http://model-lifecycle-service:8000")
# --- DTOs ---
class DeploymentCreateRequest(BaseModel):
deployment_name: str
model_name: str
version_tag: str
config: dict = {"instance_type": "default"}
# --- 后台任务与服务逻辑 ---
def _run_deployment_task(
deployment_id: uuid.UUID,
db: Session,
acl: ModelLifecycleACL,
adapter: CloudProviderAdapter
):
"""
这是在后台执行的核心部署逻辑。
"""
deployment = db.query(models.Deployment).filter(models.Deployment.id == deployment_id).first()
if not deployment:
# log error
return
try:
# 1. 更新状态为CREATING
deployment.status = models.DeploymentStatus.CREATING
db.commit()
# 2. 通过ACL从模型生命周期服务获取模型信息
model_version = acl.get_model_version(
tenant_id=deployment.tenant_id,
model_name=deployment.deployment_name, # 假设与deployment同名
version_tag="latest" # 从请求中获取
)
if not model_version:
raise Exception("Model version not found via ACL.")
# 3. 调用云服务适配器创建端点
result = adapter.create_inference_endpoint(
deployment_name=deployment.deployment_name,
artifact_uri=model_version.artifact_uri,
config=deployment.deployment_config
)
# 4. 根据结果更新部署状态
if result.status == "SUCCESS":
deployment.status = models.DeploymentStatus.ACTIVE
deployment.endpoint_details = result.endpoint_details
else:
deployment.status = models.DeploymentStatus.FAILED
deployment.error_message = result.error_message
db.commit()
except Exception as e:
deployment.status = models.DeploymentStatus.FAILED
deployment.error_message = str(e)
db.commit()
@app.post("/tenants/{tenant_id}/deployments", status_code=202)
def create_deployment(
tenant_id: uuid.UUID,
request: DeploymentCreateRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
acl: ModelLifecycleACL = Depends(get_model_lifecycle_acl),
adapter: CloudProviderAdapter = Depends(get_cloud_adapter)
):
# 检查同名部署是否存在
if db.query(models.Deployment).filter_by(tenant_id=tenant_id, deployment_name=request.deployment_name).first():
raise HTTPException(status_code=409, detail="Deployment with this name already exists.")
# 1. 创建Deployment记录,初始状态为PENDING
new_deployment = models.Deployment(
tenant_id=tenant_id,
deployment_name=request.deployment_name,
# model_version_id 可以在后台任务中填充
deployment_config=request.config,
status=models.DeploymentStatus.PENDING
)
db.add(new_deployment)
db.commit()
db.refresh(new_deployment)
# 2. 启动后台任务执行实际的部署操作
background_tasks.add_task(_run_deployment_task, new_deployment.id, db, acl, adapter)
return {"message": "Deployment process started.", "deployment_id": new_deployment.id}
这个create_deployment
端点是异步的。它立即返回202 Accepted
,并创建一个后台任务来执行耗时的云资源编排。这种设计避免了长时间阻塞API请求,是处理长时间运行任务的常见模式。一个常见的错误是把所有逻辑都放在API请求处理函数中同步执行,这在生产环境中是不可接受的。
常见误区与适用边界
- 过度设计: 对一个只有两三个模型的内部项目应用如此复杂的架构是浪费时间。DDD的价值在于管理本质复杂性。如果业务本身不复杂,简单的脚本或单体应用是更务实的选择。
- 上下文划分不清: 最大的坑在于错误地划分限界上下文。例如,如果将
Deployment
的状态直接存储在ModelVersion
表中,会导致模型生命周期
上下文被迫关心部署细节,破坏了其单一职责。一个模型版本应该是不可变的,而它的部署状态是可变的,它们分属不同领域。 - 忽略防腐层: 在上下文之间直接共享数据库是捷径,但也是长期维护的噩梦。当
模型生命周期
服务的数据库Schema变更时,可能会意外地破坏部署编排
服务。通过API进行通信虽然增加了网络开销,但换来了强有力的团队和技术隔离。
这个架构的局限性在于其初始实现的复杂度和资源开销。它需要多个服务、数据库和消息队列(在更成熟的事件驱动版本中),比单体应用需要更多的运维投入。然而,对于一个需要支持多团队、多业务线、并且希望在不同云服务商之间保持一定灵活性的CV平台来说,这种基于DDD的清晰划分是保证其长期健康演进的关键。未来的迭代,例如增加模型监控、A/B测试、自动缩放等功能,都可以作为新的上下文或在现有上下文中平滑地引入,而不会污染核心领域模型。