基于领域驱动设计构建多租户计算机视觉模型部署服务的限界上下文划分与实现


将一个独立的计算机视觉(CV)模型打包成服务并非难事,一个包含 Dockerfilerequirements.txt 和一个 main.py 的 FastAPI 应用足以应付。然而,当需求演变为构建一个支持多租户、多模型、多版本的平台级服务时,这种简单的脚本化思维会迅速崩溃。代码库会因为混杂了模型加载、数据预处理、权限校验、资源编排和计费逻辑而变得无法维护。

在真实项目中,复杂性来源于不同业务关注点的纠缠。一个模型从研究员的实验品到生产环境中的稳定API,其生命周期中涉及的领域知识截然不同。领域驱动设计(DDD)的核心价值在于,它强制我们识别并隔离这些不同的领域,在各自的限界上下文(Bounded Context)中独立建模和演进。这正是解决平台级模型部署混乱的根本之道。

本文不讨论如何训练一个具体的CV模型,而是聚焦于如何运用DDD思想,设计并实现一个多租户CV模型部署平台的后端骨架。我们将划分出清晰的限界上下文,并用生产级的Python代码展示其核心实现,目标是构建一个逻辑清晰、易于扩展且与具体云服务商实现解耦的系统。

限界上下文的识别与划分

一个模型部署平台至少要处理以下几类问题:

  1. 能操作?—— 身份与租户管理。
  2. 什么是模型?—— 模型的元数据、版本、存储。
  3. 如何部署?—— 将模型版本转化为可访问的在线服务。
  4. 如何调用?—— 提供高性能的推理端点。

这些问题的本质、变化频率和技术要求各不相同。强行将它们塞入一个单体应用,就是技术债的开始。依据DDD,我们将其划分为四个核心的限界上下文:

  1. 身份与访问上下文 (Identity & Access Context): 负责处理租户(Tenant)、用户(User)和API密钥(ApiKey)。它的模型非常稳定,主要关注认证与授权。
  2. 模型生命周期上下文 (Model Lifecycle Context): 核心是模型家族(ModelFamily)模型版本(ModelVersion)这两个聚合根。它关心模型的元数据、验证标准、存储位置(如S3 URI),但不关心它如何被部署或运行。
  3. 部署编排上下文 (Deployment Orchestration Context): 它的职责是“翻译官”。它接收一个“部署某个模型版本”的业务请求,并将其翻译成对底层云服务(如AWS SageMaker, GCP Vertex AI, หรือ Kubernetes)的具体API调用。它的核心聚合根是部署(Deployment)
  4. 推理服务上下文 (Inference Serving Context): 这是一个纯粹的“数据平面”组件。它的唯一职责就是根据指令加载指定模型,并以最低延迟处理推理请求。它无状态、高性能,并且对前三个上下文的复杂业务逻辑一无所知。

这四个上下文之间的关系可以通过下面的上下文映射图(Context Map)来表示。

graph TD
    subgraph Identity & Access Context
        A[Tenant]
        B[User]
        C[API Key]
    end

    subgraph Model Lifecycle Context
        D[ModelFamily]
        E[ModelVersion]
    end

    subgraph Deployment Orchestration Context
        F[Deployment]
        G[Endpoint]
    end

    subgraph Inference Serving Context
        H[Inference Worker]
    end

    style Inference Serving Context fill:#f9f,stroke:#333,stroke-width:2px

    User -- Manages --> ModelFamily
    User -- Requests --> Deployment
    Deployment -- "Reads Model Info via ACL" --> ModelVersion
    Deployment -- "Orchestrates Cloud Resources" --> G
    G -- "Routes Traffic to" --> H
    H -- "Loads Artifact from S3" --> ModelVersion

    %% ACL Definition
    linkStyle 2 stroke-width:2px,stroke:red,fill:none;

这里的关键在于部署编排上下文模型生命周期上下文之间的交互。它们之间不共享数据库,而是通过一个明确定义的防腐层(Anti-Corruption Layer, ACL)进行通信。在我们的实现中,这个ACL就是一个内部API客户端。

核心实现:模型生命周期上下文

这个上下文是所有模型信息的权威来源。它的API相对简单,主要是CRUD操作,但其内部的数据模型和验证逻辑必须非常健壮。

我们使用FastAPI和Pydantic来构建服务,用SQLAlchemy来定义数据模型。

1. 数据模型 (models.py)

# model_lifecycle_service/models.py
import uuid
from datetime import datetime
from sqlalchemy import (Column, String, DateTime, ForeignKey, JSON,
                        UniqueConstraint)
from sqlalchemy.orm import relationship, declarative_base
from sqlalchemy.dialects.postgresql import UUID

Base = declarative_base()

class Tenant(Base):
    __tablename__ = "tenants"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String, unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

class ModelFamily(Base):
    """
    聚合根:模型家族。代表一个特定功能的模型,例如“人脸检测模型”。
    它包含多个版本。
    """
    __tablename__ = "model_families"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    tenant_id = Column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False)
    name = Column(String, nullable=False)
    description = Column(String)
    created_at = Column(DateTime, default=datetime.utcnow)

    versions = relationship("ModelVersion", back_populates="family", cascade="all, delete-orphan")
    
    __table_args__ = (UniqueConstraint('tenant_id', 'name', name='_tenant_model_name_uc'),)

class ModelVersion(Base):
    """
    实体:模型版本。一个具体的、不可变的模型文件。
    """
    __tablename__ = "model_versions"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    family_id = Column(UUID(as_uuid=True), ForeignKey("model_families.id"), nullable=False)
    version_tag = Column(String, nullable=False, default="latest") # e.g., "v1.0.0", "latest"
    
    # 指向云存储中模型文件的URI,例如 s3://my-bucket/models/face-detector/v1.0.0/model.onnx
    artifact_uri = Column(String, nullable=False) 
    
    # 模型的元数据,例如输入/输出格式,指标等
    metadata = Column(JSON) 
    
    created_at = Column(DateTime, default=datetime.utcnow)

    family = relationship("ModelFamily", back_populates="versions")

    __table_args__ = (UniqueConstraint('family_id', 'version_tag', name='_family_version_tag_uc'),)

这里的ModelFamily是聚合根。它确保了tenant_idname的组合是唯一的,维护了业务不变量。ModelVersion是其下的一个实体。

2. API接口与服务逻辑 (main.py)

# model_lifecycle_service/main.py
import os
import uuid
from typing import List
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from .database import get_db, engine
from . import models

models.Base.metadata.create_all(bind=engine)

app = FastAPI(title="Model Lifecycle Service")

# --- DTOs (Data Transfer Objects) ---
class ModelVersionCreate(BaseModel):
    version_tag: str = "latest"
    artifact_uri: str = Field(..., example="s3://my-model-bucket/path/to/model.onnx")
    metadata: dict = Field(default_factory=dict)

class ModelVersionResponse(BaseModel):
    id: uuid.UUID
    version_tag: str
    artifact_uri: str
    metadata: dict
    created_at: str

    class Config:
        orm_mode = True

class ModelFamilyResponse(BaseModel):
    id: uuid.UUID
    name: str
    description: str
    versions: List[ModelVersionResponse]

    class Config:
        orm_mode = True

# --- API Endpoints ---
@app.post("/tenants/{tenant_id}/models/{model_name}/versions", 
          response_model=ModelVersionResponse, status_code=201)
def register_new_model_version(
    tenant_id: uuid.UUID,
    model_name: str,
    version_data: ModelVersionCreate,
    db: Session = Depends(get_db)
):
    """
    注册一个新的模型版本到一个模型家族。如果家族不存在,则创建。
    这是一个核心的写操作,必须是事务性的。
    """
    # 检查租户是否存在(在真实系统中,这会通过调用Identity服务或共享的租户表)
    tenant = db.query(models.Tenant).filter(models.Tenant.id == tenant_id).first()
    if not tenant:
        raise HTTPException(status_code=404, detail=f"Tenant '{tenant_id}' not found.")
    
    # 查找或创建ModelFamily
    family = db.query(models.ModelFamily).filter_by(tenant_id=tenant_id, name=model_name).first()
    if not family:
        family = models.ModelFamily(tenant_id=tenant_id, name=model_name, description=f"Auto-created family for {model_name}")
        db.add(family)
        # 必须先flush才能获取family.id
        db.flush() 

    # 检查版本标签是否已存在
    existing_version = db.query(models.ModelVersion).filter_by(family_id=family.id, version_tag=version_data.version_tag).first()
    if existing_version:
        raise HTTPException(status_code=409, detail=f"Version tag '{version_data.version_tag}' already exists for model '{model_name}'.")

    # 在真实项目中,这里应该有逻辑去验证artifact_uri的可访问性和模型的有效性
    # e.g., check_s3_uri_access(version_data.artifact_uri)
    
    new_version = models.ModelVersion(
        family_id=family.id,
        **version_data.dict()
    )
    db.add(new_version)
    db.commit()
    db.refresh(new_version)

    return new_version

@app.get("/tenants/{tenant_id}/models/{model_name}/versions/{version_tag}",
         response_model=ModelVersionResponse)
def get_model_version_details(
    tenant_id: uuid.UUID,
    model_name: str,
    version_tag: str,
    db: Session = Depends(get_db)
):
    """
    供其他服务(如部署编排服务)调用的内部API,用于获取模型制品的详细信息。
    这是防腐层(ACL)的一部分。
    """
    version = (db.query(models.ModelVersion)
               .join(models.ModelFamily)
               .filter(models.ModelFamily.tenant_id == tenant_id,
                       models.ModelFamily.name == model_name,
                       models.ModelVersion.version_tag == version_tag)
               .first())
    
    if not version:
        raise HTTPException(status_code=404, detail="Model version not found.")
    
    return version

注意register_new_model_version这个函数,它封装了核心业务逻辑:确保租户存在、原子性地“查找或创建”模型家族、校验版本唯一性。这是领域服务(Domain Service)的体现。

核心实现:部署编排上下文

这个上下文是系统的“大脑”,负责状态管理和与外部基础设施的交互。它的复杂性在于处理异步操作和状态转换。

1. 数据模型与状态机 (models.py)

# deployment_orchestration_service/models.py
import uuid
from enum import Enum
from datetime import datetime
from sqlalchemy import (Column, String, DateTime, ForeignKey, JSON,
                        UniqueConstraint, Enum as SQLAlchemyEnum)
from sqlalchemy.orm import declarative_base
from sqlalchemy.dialects.postgresql import UUID

Base = declarative_base()

class DeploymentStatus(str, Enum):
    PENDING = "PENDING"
    CREATING = "CREATING" # 正在与云服务商交互
    ACTIVE = "ACTIVE"   # 部署成功,端点可用
    FAILED = "FAILED"
    TERMINATING = "TERMINATING"
    TERMINATED = "TERMINATED"

class Deployment(Base):
    """
    聚合根:部署。代表一个模型版本到在线服务的映射。
    """
    __tablename__ = "deployments"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    tenant_id = Column(UUID(as_uuid=True), nullable=False)
    
    # 部署的唯一名称,用于生成访问域名
    deployment_name = Column(String, nullable=False) 
    
    # 引用模型生命周期上下文中的模型版本ID
    model_version_id = Column(UUID(as_uuid=True), nullable=False)
    
    status = Column(SQLAlchemyEnum(DeploymentStatus), nullable=False, default=DeploymentStatus.PENDING)
    
    # 云服务商返回的端点信息
    endpoint_details = Column(JSON) # e.g., {"url": "https://...", "cloud_resource_id": "arn:..."}
    
    # 部署配置,例如实例类型、数量等
    deployment_config = Column(JSON) # e.g., {"instance_type": "ml.m5.large", "instance_count": 1}
    
    error_message = Column(String, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    __table_args__ = (UniqueConstraint('tenant_id', 'deployment_name', name='_tenant_deployment_name_uc'),)

Deployment聚合根的核心是status字段。所有的业务操作都围绕这个状态机进行。

2. 服务与云服务适配器 (main.pycloud_adapter.py)

这里的关键是隔离云服务商的具体实现。我们定义一个抽象的CloudProviderAdapter接口,然后可以为AWS SageMaker、Kubernetes等提供具体实现。

# deployment_orchestration_service/cloud_adapter.py
from abc import ABC, abstractmethod
from typing import Dict, Any

class CloudDeploymentResult(BaseModel):
    status: str # "SUCCESS" or "FAILURE"
    endpoint_details: Dict[str, Any] = None
    error_message: str = None

class CloudProviderAdapter(ABC):
    """
    适配器模式:隔离与特定云服务商的交互。
    """
    @abstractmethod
    def create_inference_endpoint(self, deployment_name: str, artifact_uri: str, config: Dict[str, Any]) -> CloudDeploymentResult:
        """
        创建一个推理端点。这是一个耗时操作,通常是异步的。
        在真实实现中,这会触发一个后台任务。
        """
        pass

    @abstractmethod
    def delete_inference_endpoint(self, deployment_name: str, endpoint_details: Dict[str, Any]) -> None:
        """删除推理端点"""
        pass

class MockCloudProviderAdapter(CloudProviderAdapter):
    """一个用于测试和演示的伪实现。"""
    def create_inference_endpoint(self, deployment_name: str, artifact_uri: str, config: Dict[str, Any]) -> CloudDeploymentResult:
        import time
        import random
        
        print(f"[MockAdapter] Received request to deploy {artifact_uri} for {deployment_name} with config {config}")
        time.sleep(2) # 模拟API调用延迟
        
        if random.random() < 0.1: # 10%的失败率
            return CloudDeploymentResult(status="FAILURE", error_message="Mock cloud provider random failure.")
        
        return CloudDeploymentResult(
            status="SUCCESS",
            endpoint_details={
                "url": f"https://{deployment_name}.mock-cloud.com/invocations",
                "cloud_resource_id": f"mock-id-{uuid.uuid4()}"
            }
        )

    def delete_inference_endpoint(self, deployment_name: str, endpoint_details: Dict[str, Any]) -> None:
        print(f"[MockAdapter] Deleting endpoint {deployment_name} with details {endpoint_details}")
        time.sleep(1)
        print(f"[MockAdapter] Deletion successful for {deployment_name}")

现在,主服务逻辑可以注入这个适配器,而不需要知道底层的具体实现。

# deployment_orchestration_service/main.py
import uuid
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel
from .acl import ModelLifecycleACL # 这是一个简单的API客户端
from .cloud_adapter import MockCloudProviderAdapter, CloudProviderAdapter
from .database import get_db
from . import models

app = FastAPI(title="Deployment Orchestration Service")

# --- 依赖注入 ---
def get_cloud_adapter() -> CloudProviderAdapter:
    # 这里可以根据配置选择不同的实现
    return MockCloudProviderAdapter()

def get_model_lifecycle_acl() -> ModelLifecycleACL:
    # ACL客户端,负责与模型生命周期服务通信
    return ModelLifecycleACL(base_url="http://model-lifecycle-service:8000")

# --- DTOs ---
class DeploymentCreateRequest(BaseModel):
    deployment_name: str
    model_name: str
    version_tag: str
    config: dict = {"instance_type": "default"}

# --- 后台任务与服务逻辑 ---
def _run_deployment_task(
    deployment_id: uuid.UUID,
    db: Session,
    acl: ModelLifecycleACL,
    adapter: CloudProviderAdapter
):
    """
    这是在后台执行的核心部署逻辑。
    """
    deployment = db.query(models.Deployment).filter(models.Deployment.id == deployment_id).first()
    if not deployment:
        # log error
        return
        
    try:
        # 1. 更新状态为CREATING
        deployment.status = models.DeploymentStatus.CREATING
        db.commit()

        # 2. 通过ACL从模型生命周期服务获取模型信息
        model_version = acl.get_model_version(
            tenant_id=deployment.tenant_id,
            model_name=deployment.deployment_name, # 假设与deployment同名
            version_tag="latest" # 从请求中获取
        )
        if not model_version:
            raise Exception("Model version not found via ACL.")

        # 3. 调用云服务适配器创建端点
        result = adapter.create_inference_endpoint(
            deployment_name=deployment.deployment_name,
            artifact_uri=model_version.artifact_uri,
            config=deployment.deployment_config
        )

        # 4. 根据结果更新部署状态
        if result.status == "SUCCESS":
            deployment.status = models.DeploymentStatus.ACTIVE
            deployment.endpoint_details = result.endpoint_details
        else:
            deployment.status = models.DeploymentStatus.FAILED
            deployment.error_message = result.error_message
        
        db.commit()

    except Exception as e:
        deployment.status = models.DeploymentStatus.FAILED
        deployment.error_message = str(e)
        db.commit()


@app.post("/tenants/{tenant_id}/deployments", status_code=202)
def create_deployment(
    tenant_id: uuid.UUID,
    request: DeploymentCreateRequest,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    acl: ModelLifecycleACL = Depends(get_model_lifecycle_acl),
    adapter: CloudProviderAdapter = Depends(get_cloud_adapter)
):
    # 检查同名部署是否存在
    if db.query(models.Deployment).filter_by(tenant_id=tenant_id, deployment_name=request.deployment_name).first():
        raise HTTPException(status_code=409, detail="Deployment with this name already exists.")

    # 1. 创建Deployment记录,初始状态为PENDING
    new_deployment = models.Deployment(
        tenant_id=tenant_id,
        deployment_name=request.deployment_name,
        # model_version_id 可以在后台任务中填充
        deployment_config=request.config,
        status=models.DeploymentStatus.PENDING
    )
    db.add(new_deployment)
    db.commit()
    db.refresh(new_deployment)

    # 2. 启动后台任务执行实际的部署操作
    background_tasks.add_task(_run_deployment_task, new_deployment.id, db, acl, adapter)

    return {"message": "Deployment process started.", "deployment_id": new_deployment.id}

这个create_deployment端点是异步的。它立即返回202 Accepted,并创建一个后台任务来执行耗时的云资源编排。这种设计避免了长时间阻塞API请求,是处理长时间运行任务的常见模式。一个常见的错误是把所有逻辑都放在API请求处理函数中同步执行,这在生产环境中是不可接受的。

常见误区与适用边界

  • 过度设计: 对一个只有两三个模型的内部项目应用如此复杂的架构是浪费时间。DDD的价值在于管理本质复杂性。如果业务本身不复杂,简单的脚本或单体应用是更务实的选择。
  • 上下文划分不清: 最大的坑在于错误地划分限界上下文。例如,如果将Deployment的状态直接存储在ModelVersion表中,会导致模型生命周期上下文被迫关心部署细节,破坏了其单一职责。一个模型版本应该是不可变的,而它的部署状态是可变的,它们分属不同领域。
  • 忽略防腐层: 在上下文之间直接共享数据库是捷径,但也是长期维护的噩梦。当模型生命周期服务的数据库Schema变更时,可能会意外地破坏部署编排服务。通过API进行通信虽然增加了网络开销,但换来了强有力的团队和技术隔离。

这个架构的局限性在于其初始实现的复杂度和资源开销。它需要多个服务、数据库和消息队列(在更成熟的事件驱动版本中),比单体应用需要更多的运维投入。然而,对于一个需要支持多团队、多业务线、并且希望在不同云服务商之间保持一定灵活性的CV平台来说,这种基于DDD的清晰划分是保证其长期健康演进的关键。未来的迭代,例如增加模型监控、A/B测试、自动缩放等功能,都可以作为新的上下文或在现有上下文中平滑地引入,而不会污染核心领域模型。


  目录