基于Scala与自定义ISR日志实现Qdrant高可用向量摄入网关


系统入口的写入请求,如果直接透传到后端的有状态服务,尤其像Qdrant这样的向量数据库,会面临一个直接的脆弱性:数据库的短暂抖动、网络分区或写入压力过大,都会直接导致请求失败。客户端被迫实现复杂的重试逻辑,而系统整体的写入可用性则完全取决于数据库的瞬时状态。在生产环境中,这种设计是不可接受的。

我们需要在网关层引入一个具备持久化能力的缓冲层。常规方案是引入Kafka或Pulsar,但这对于某些场景而言过于笨重。我们的目标是构建一个轻量级、高可用的摄入网关,它能对写入请求进行缓冲和复制,确保数据在进入Qdrant之前绝不丢失,同时为客户端提供一个稳定且高吞吐的入口。

我们将使用Scala和Akka Typed Actor模型,从头实现一个基于In-Sync Replicas (ISR)概念的轻量级分布式日志服务,并将其集成到一个代理网关中,专门负责向Qdrant的向量集合进行高可用写入。这个网关的核心职责是:接收写入请求,将请求持久化到内部的复制日志中,一旦日志在ISR节点间达成一致,立即向客户端返回成功,然后异步地、可靠地将数据批量推送到Qdrant。

架构设计:解耦写入确认与最终持久化

整体架构围绕一个核心目标:将客户端的写入确认(ack)与Qdrant的实际写入操作解耦。

sequenceDiagram
    participant Client
    participant IngestionGateway (Scala/Akka HTTP)
    participant ReplicationCoordinator (Leader Actor)
    participant LogFollower1 (Follower Actor)
    participant LogFollower2 (Follower Actor)
    participant QdrantWriter (Actor)
    participant Qdrant

    Client->>+IngestionGateway: POST /v1/collections/my_collection/points (vector data)
    IngestionGateway->>+ReplicationCoordinator: WriteOp(id, payload)
    ReplicationCoordinator->>ReplicationCoordinator: Append to own log (offset: N)
    ReplicationCoordinator->>+LogFollower1: Replicate(offset: N, payload)
    ReplicationCoordinator->>+LogFollower2: Replicate(offset: N, payload)

    LogFollower1-->>-ReplicationCoordinator: Ack(offset: N)
    Note right of ReplicationCoordinator: ISR set = {Self, Follower1}. Follower2 is lagging.
    ReplicationCoordinator-->>-IngestionGateway: WriteConfirmed(id)
    IngestionGateway-->>-Client: HTTP 202 Accepted

    LogFollower2-->>-ReplicationCoordinator: Ack(offset: N)
    Note right of ReplicationCoordinator: Follower2 catches up, added to ISR.

    QdrantWriter->>ReplicationCoordinator: PullCommittedLogs
    ReplicationCoordinator-->>QdrantWriter: [Op(N), Op(N+1), ...]
    QdrantWriter->>+Qdrant: Upsert Points (batch)
    Qdrant-->>-QdrantWriter: Success

这个流程的关键点在于:IngestionGateway向客户端返回202Accepted的时刻,是基于ReplicationCoordinator的确认,而非Qdrant。这个确认的条件是“写入操作已被ISR集合中的所有副本确认”,这为我们提供了可配置的数据持久性保证。

核心组件实现:Akka Typed Actor

我们将使用Akka Typed来构建这个并发系统。Actor模型天然适合这种需要管理状态、处理消息和容错的分布式组件。

1. Actor协议定义 (Protocol.scala)

首先定义所有参与者之间的通信契约。这是构建强类型Actor系统的第一步。

import akka.actor.typed.ActorRef

object ReplicationProtocol {
  // Common trait for all commands
  sealed trait Command

  // Client-facing command sent to the coordinator
  final case class Write(payload: String, replyTo: ActorRef[WriteResult]) extends Command
  sealed trait WriteResult
  final case class WriteSuccess(correlationId: Long) extends WriteResult
  final case class WriteFailure(correlationId: Long, reason: String) extends WriteResult

  // Internal replication commands
  private[ReplicationProtocol] final case class Replicate(logId: Long, payload: String) extends Command
  private[ReplicationProtocol] final case class ReplicationAck(logId: Long, from: ActorRef[Command]) extends Command

  // Follower management
  private[ReplicationProtocol] final case class RegisterFollower(follower: ActorRef[Command]) extends Command
  private[ReplicationProtocol] final case class FollowerHeartbeat(from: ActorRef[Command]) extends Command
  private[ReplicationProtocol] case object CheckHeartbeatTimeout extends Command

  // Data pulling for Qdrant writer
  final case class PullCommittedLogs(replyTo: ActorRef[CommittedLogs]) extends Command
  final case class CommittedLogs(logs: Seq[(Long, String)])
}

2. 复制协调器 (ReplicationCoordinator.scala)

这是系统的核心,扮演Leader角色。它负责接收客户端写入、向Follower复制日志、维护ISR集合以及管理已提交的日志偏移量。

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors, TimerScheduler}
import scala.collection.mutable
import scala.concurrent.duration._

object ReplicationCoordinator {
  import ReplicationProtocol._

  def apply(minIsr: Int): Behavior[Command] =
    Behaviors.setup { context =>
      Behaviors.withTimers { timers =>
        new ReplicationCoordinator(context, timers, minIsr)
      }
    }
}

class ReplicationCoordinator(
  context: ActorContext[ReplicationProtocol.Command],
  timers: TimerScheduler[ReplicationProtocol.Command],
  minIsr: Int
) extends AbstractBehavior[ReplicationProtocol.Command](context) {
  import ReplicationProtocol._

  // State of the leader
  private val log = mutable.ArrayBuffer.empty[(Long, String)]
  private var nextLogId: Long = 0L

  private var followers = Set.empty[ActorRef[Command]]
  private var inSyncReplicas = Set.empty[ActorRef[Command]]

  // Tracking pending replications for each log entry
  private val pendingAcks = mutable.Map.empty[Long, (Set[ActorRef[Command]], ActorRef[WriteResult])]

  // High-water mark: log entries up to this ID are committed
  private var commitIndex: Long = -1L

  // For follower health checks
  private val lastHeartbeats = mutable.Map.empty[ActorRef[Command], Long]
  private val heartbeatTimeout = 5.seconds
  private val heartbeatCheckKey = "heartbeat-check"

  // Add self to ISR set
  inSyncReplicas += context.self

  timers.startTimerAtFixedRate(heartbeatCheckKey, CheckHeartbeatTimeout, heartbeatTimeout)

  override def onMessage(msg: Command): Behavior[Command] = {
    msg match {
      case Write(payload, replyTo) =>
        handleWrite(payload, replyTo)
        this

      case ReplicationAck(logId, from) =>
        handleReplicationAck(logId, from)
        this

      case RegisterFollower(follower) =>
        context.log.info(s"Registering new follower: ${follower.path.name}")
        followers += follower
        lastHeartbeats(follower) = System.currentTimeMillis()
        // A new follower is not in sync until it catches up
        this

      case FollowerHeartbeat(from) =>
        if (followers.contains(from)) {
          lastHeartbeats(from) = System.currentTimeMillis()
          // If a follower was out of sync, this could be a trigger to try and catch it up
          // For simplicity, we only add it back to ISR on an actual log ACK.
        }
        this

      case CheckHeartbeatTimeout =>
        checkFollowerHealth()
        this
        
      case PullCommittedLogs(replyTo) =>
        // Only return logs that are safely committed
        val logsToReturn = log.slice(0, (commitIndex + 1).toInt).toSeq
        replyTo ! CommittedLogs(logsToReturn)
        this
    }
  }

  private def handleWrite(payload: String, replyTo: ActorRef[WriteResult]): Unit = {
    val currentLogId = nextLogId
    log.append((currentLogId, payload))
    context.log.info(s"Received write for log $currentLogId. Replicating to ISR: ${inSyncReplicas.size - 1} followers.")

    // Track which ISR members need to acknowledge this write
    pendingAcks(currentLogId) = (inSyncReplicas, replyTo)

    // Replicate to followers that are currently in sync
    val replicateMsg = Replicate(currentLogId, payload)
    inSyncReplicas.filter(_ != context.self).foreach(_ ! replicateMsg)

    // The leader ACKs its own write immediately
    handleReplicationAck(currentLogId, context.self)
    
    nextLogId += 1
  }

  private def handleReplicationAck(logId: Long, from: ActorRef[Command]): Unit = {
    pendingAcks.get(logId) match {
      case Some((pending, replyTo)) =>
        val updatedPending = pending - from
        // A follower that was lagging might catch up and ACK. If so, add it back to ISR.
        if (followers.contains(from) && !inSyncReplicas.contains(from)) {
          context.log.info(s"Follower ${from.path.name} caught up at log $logId. Adding to ISR.")
          inSyncReplicas += from
        }

        if (updatedPending.isEmpty) {
          // All replicas in the ISR set at the time of writing have acknowledged.
          context.log.info(s"Log $logId fully replicated within ISR set.")
          pendingAcks.remove(logId)
          // Advance the commit index. This is crucial.
          updateCommitIndex()
          // Now we can reply to the original client
          replyTo ! WriteSuccess(logId)
        } else {
          // Update the pending set
          pendingAcks(logId) = (updatedPending, replyTo)
        }
      case None =>
      // This could be a late ACK for an already committed log, which is fine.
      // Or an ack from a follower that was not in ISR, which is also fine.
    }
  }

  private def updateCommitIndex(): Unit = {
      // The commit index is the highest log ID that has been acked by all replicas in the current ISR set.
      // This is a simplified model. A more robust implementation would track acknowledgements per replica.
      // Here, we advance commitIndex to the highest consecutively successful log write.
      while (commitIndex + 1 < nextLogId && !pendingAcks.contains(commitIndex + 1)) {
          commitIndex += 1
      }
      context.log.info(s"Commit index advanced to $commitIndex")
  }


  private def checkFollowerHealth(): Unit = {
    val now = System.currentTimeMillis()
    val deadFollowers = followers.filter { f =>
      now - lastHeartbeats.getOrElse(f, now) > heartbeatTimeout.toMillis
    }

    if (deadFollowers.nonEmpty) {
      context.log.warn(s"Followers timed out and removed from ISR: $deadFollowers")
      followers --= deadFollowers
      inSyncReplicas --= deadFollowers
      lastHeartbeats --= deadFollowers

      // If ISR size drops below minimum, we can no longer guarantee durability.
      // In a real system, this should trigger an alarm or even halt new writes.
      if (inSyncReplicas.size < minIsr) {
        context.log.error(s"CRITICAL: ISR size (${inSyncReplicas.size}) is below minimum required ($minIsr). Write durability is at risk.")
        // Here we could implement a "circuit breaker" logic to reject new writes.
        // For now, we just log an error.
      }
    }
  }
}

代码解析:

  • 状态管理: log, followers, inSyncReplicas, pendingAcks, commitIndex 构成了Leader的全部状态。
  • ISR维护: checkFollowerHealth是ISR管理的核心。它定期检查心跳,将超时的Follower从followersinSyncReplicas集合中移除。如果inSyncReplicas的数量低于配置的minIsr,系统进入危险状态。
  • 写入流程: handleWrite接收请求,将日志追加到本地,然后向当前ISR集合中的所有Follower广播Replicate消息。它通过pendingAcks来跟踪每个日志条目的确认状态。
  • 确认与提交: handleReplicationAck在收到Follower的确认后,更新pendingAcks。当一个日志条目的所有待确认副本都清空后,代表该日志在ISR内部达成一致。此时,我们才能安全地向客户端确认写入,并尝试推进commitIndex
  • 数据可见性: PullCommittedLogs接口只返回commitIndex之前的数据,确保下游消费者(QdrantWriter)不会读取到尚未被完全复制的数据。

3. 日志跟随者 (LogFollower.scala)

Follower的角色相对简单:接收日志、存储,并发送心跳和ACK。

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors, TimerScheduler}
import scala.collection.mutable
import scala.concurrent.duration._

object LogFollower {
  import ReplicationProtocol._

  def apply(coordinator: ActorRef[Command]): Behavior[Command] = {
    Behaviors.setup { context =>
      Behaviors.withTimers { timers =>
        new LogFollower(context, timers, coordinator)
      }
    }
  }
}

class LogFollower(
  context: ActorContext[ReplicationProtocol.Command],
  timers: TimerScheduler[ReplicationProtocol.Command],
  coordinator: ActorRef[ReplicationProtocol.Command]
) extends AbstractBehavior[ReplicationProtocol.Command](context) {
  import ReplicationProtocol._

  private val log = mutable.Map.empty[Long, String]
  private var nextExpectedLogId: Long = 0L
  
  private val heartbeatInterval = 2.seconds
  private val heartbeatKey = "follower-heartbeat"

  override def onMessage(msg: Command): Behavior[Command] = {
    msg match {
      case Replicate(logId, payload) =>
        // For simplicity, we assume messages arrive in order.
        // A production system would need to handle out-of-order messages, gaps, etc.
        if (logId == nextExpectedLogId) {
          context.log.info(s"Replicating log $logId")
          log(logId) = payload
          nextExpectedLogId += 1
          coordinator ! ReplicationAck(logId, context.self)
        } else {
          context.log.warn(s"Received out-of-order log. Expected $nextExpectedLogId, but got $logId. Ignoring.")
          // In a real scenario, the follower should request missing logs from the leader.
        }
        this
      case _ => // Followers ignore other messages
    }
    this
  }

  private def sendHeartbeat(): Unit = {
    coordinator ! FollowerHeartbeat(context.self)
  }

  // Initial registration and start heartbeating
  coordinator ! RegisterFollower(context.self)
  timers.startTimerAtFixedRate(heartbeatKey, FollowerHeartbeat(context.self), heartbeatInterval)
  
  Behaviors.same
}

4. Qdrant写入器 (QdrantWriter.scala)

这是一个独立的Actor,定期从ReplicationCoordinator拉取已提交的日志,并批量写入Qdrant。这种模式将数据库写入的压力和不确定性与主写入路径隔离开。

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors, TimerScheduler}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream.scaladsl.Source
import akka.util.ByteString
import spray.json._ // Assuming spray-json for JSON handling

import scala.concurrent.duration._
import scala.util.{Failure, Success}

// A simple representation of Qdrant point for JSON serialization
case class QdrantPoint(id: String, vector: List[Float], payload: Map[String, String])
case class UpsertRequest(points: Seq[QdrantPoint])

object MyJsonProtocol extends DefaultJsonProtocol {
  implicit val qdrantPointFormat: RootJsonFormat[QdrantPoint] = jsonFormat3(QdrantPoint)
  implicit val upsertRequestFormat: RootJsonFormat[UpsertRequest] = jsonFormat1(UpsertRequest)
}

object QdrantWriter {
  import ReplicationProtocol._

  sealed trait QdrantWriterCommand
  private case object Pull extends QdrantWriterCommand
  private final case class WrappedCommittedLogs(logs: CommittedLogs) extends QdrantWriterCommand

  def apply(
    coordinator: ActorRef[Command],
    qdrantUri: Uri,
    qdrantApiKey: String,
    collectionName: String,
    batchSize: Int
  ): Behavior[QdrantWriterCommand] =
    Behaviors.setup { context =>
      Behaviors.withTimers { timers =>
        new QdrantWriter(context, timers, coordinator, qdrantUri, qdrantApiKey, collectionName, batchSize)
      }
    }
}

class QdrantWriter(
  context: ActorContext[QdrantWriter.QdrantWriterCommand],
  timers: TimerScheduler[QdrantWriter.QdrantWriterCommand],
  coordinator: ActorRef[ReplicationProtocol.Command],
  qdrantUri: Uri,
  qdrantApiKey: String,
  collectionName: String,
  batchSize: Int
) extends AbstractBehavior[QdrantWriter.QdrantWriterCommand](context) {
  import QdrantWriter._
  import ReplicationProtocol._
  import MyJsonProtocol._
  import context.executionContext

  private var lastPulledIndex: Long = -1L
  private val http = Http(context.system)

  private val committedLogsAdapter: ActorRef[CommittedLogs] = context.messageAdapter(WrappedCommittedLogs.apply)
  
  timers.startTimerAtFixedRate("qdrant-pull-timer", Pull, 5.seconds)

  override def onMessage(msg: QdrantWriterCommand): Behavior[QdrantWriterCommand] = {
    msg match {
      case Pull =>
        coordinator ! PullCommittedLogs(committedLogsAdapter)
        this
      
      case WrappedCommittedLogs(CommittedLogs(logs)) =>
        val newLogs = logs.filter(_._1 > lastPulledIndex)
        if (newLogs.nonEmpty) {
          context.log.info(s"Pulled ${newLogs.size} new committed logs to write to Qdrant.")
          processLogs(newLogs)
          lastPulledIndex = newLogs.map(_._1).max
        }
        this
    }
  }

  private def processLogs(logs: Seq[(Long, String)]): Unit = {
    logs.grouped(batchSize).foreach { batch =>
      // Here we assume the payload is a JSON that can be parsed into QdrantPoint
      // In a real system, payload parsing requires robust error handling.
      val points = batch.flatMap { case (id, payload) =>
        // This parsing is fragile and for demonstration only.
        try {
          // Assuming payload is "vector_json|payload_json"
          val parts = payload.split('|')
          val vector = parts(0).parseJson.convertTo[List[Float]]
          val metadata = parts(1).parseJson.convertTo[Map[String, String]]
          Some(QdrantPoint(id.toString, vector, metadata))
        } catch {
          case e: Exception =>
            context.log.error(s"Failed to parse log entry $id. Skipping. Error: ${e.getMessage}")
            None // Skip malformed data
        }
      }

      if (points.nonEmpty) {
        val requestEntity = HttpEntity(
          ContentTypes.`application/json`,
          UpsertRequest(points).toJson.compactPrint
        )
        
        val targetUri = qdrantUri.withPath(Uri.Path(s"/collections/$collectionName/points"))
        
        val request = HttpRequest(
          method = HttpMethods.PUT,
          uri = targetUri.withQuery(Uri.Query("wait" -> "true")), // wait for indexing
          headers = List(RawHeader("api-key", qdrantApiKey)),
          entity = requestEntity
        )

        http.singleRequest(request).onComplete {
          case Success(res) if res.status.isSuccess() =>
            context.log.info(s"Successfully upserted batch of ${points.size} points to Qdrant.")
            res.entity.discardBytes()
          case Success(res) =>
            context.log.error(s"Failed to upsert batch to Qdrant. Status: ${res.status}. Response: ${res.entity.toString}")
            // Implement retry or dead-letter queue logic here.
            res.entity.discardBytes()
          case Failure(ex) =>
            context.log.error("HTTP request to Qdrant failed.", ex)
            // Implement retry logic here.
        }
      }
    }
  }
}

代码解析:

  • 解耦: QdrantWriter完全独立运行,通过定时拉取Pull消息与ReplicationCoordinator通信。
  • 批量处理: 它将日志分批(batchSize),以提高写入Qdrant的效率。
  • 健壮性: 在生产代码中,processLogs中的HTTP请求部分必须包含更复杂的错误处理,例如指数退避重试、持久化失败的日志到死信队列等。这里的实现仅为演示。
  • 数据格式: 我们假设日志的payload是一个可解析为向量和元数据的字符串。实际项目中,这里会使用Protobuf或Avro等更高效的序列化格式。

网关入口与系统组装

最后,我们使用Akka HTTP构建网关的HTTP服务入口,并将所有Actor组装起来。

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.io.StdIn

object IngestionGatewayApp {

  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load("application.conf")
    val minIsr = config.getInt("replication.min-isr")
    val qdrantHost = config.getString("qdrant.host")
    val qdrantPort = config.getInt("qdrant.port")
    val qdrantApiKey = config.getString("qdrant.api-key")
    val collectionName = config.getString("qdrant.collection-name")

    val rootBehavior = Behaviors.setup[Nothing] { context =>
      val coordinator = context.spawn(ReplicationCoordinator(minIsr), "ReplicationCoordinator")
      
      // Spawn a few followers for demonstration
      context.spawn(LogFollower(coordinator), "Follower-1")
      context.spawn(LogFollower(coordinator), "Follower-2")
      
      val qdrantUri = Uri(s"http://$qdrantHost:$qdrantPort")
      context.spawn(QdrantWriter(coordinator, qdrantUri, qdrantApiKey, collectionName, 100), "QdrantWriter")

      startHttpServer(coordinator)(context.system)

      Behaviors.empty
    }
    
    val system = ActorSystem[Nothing](rootBehavior, "IngestionGatewaySystem")
  }

  def startHttpServer(coordinator: ActorRef[ReplicationProtocol.Command])(implicit system: ActorSystem[_]): Unit = {
    import ReplicationProtocol._
    implicit val timeout: Timeout = 3.seconds
    import system.executionContext

    val route =
      path("ingest") {
        post {
          entity(as[String]) { payload =>
            val result: Future[WriteResult] = coordinator.ask(ref => Write(payload, ref))
            onComplete(result) {
              case Success(WriteSuccess(logId)) =>
                complete(StatusCodes.Accepted, s"Write accepted with logId: $logId")
              case Success(WriteFailure(_, reason)) =>
                complete(StatusCodes.InternalServerError, s"Write failed: $reason")
              case Failure(ex) =>
                complete(StatusCodes.ServiceUnavailable, s"Request timed out: ${ex.getMessage}")
            }
          }
        }
      }

    val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine()
    bindingFuture
      .flatMap(_.unbind())
      .onComplete(_ => system.terminate())
  }
}

配置文件 (application.conf):

replication {
  min-isr = 2 # Leader + at least one follower must ACK
}

qdrant {
  host = "localhost"
  port = 6333
  api-key = "your-qdrant-api-key" # Can be empty if not set
  collection-name = "my_test_collection"
}

当前方案的局限性与未来展望

这个实现虽然展示了核心思想,但在生产环境中仍有诸多不足:

  1. 无Leader选举: 当前ReplicationCoordinator是一个单点。如果它崩溃,整个写入服务将不可用。一个完整的方案需要基于Raft或Paxos实现自动的Leader选举,Akka Cluster Sharding和Singleton可以作为实现此功能的基础。
  2. 日志持久化: 所有日志都存储在Actor的内存中。重启将导致数据丢失。需要引入磁盘持久化,例如使用Akka Persistence,将事件日志存储到RocksDB或Cassandra等后端。
  3. Follower追赶逻辑: Follower的实现非常初级,无法处理乱序或丢失的Replicate消息。它需要一个更复杂的机制来从Leader处拉取缺失的日志以完成追赶。
  4. 背压与流控: 如果写入速度远超Qdrant的处理能力,内存中的日志会无限增长。需要实现背压机制,当commitIndexnextLogId差距过大时,网关应主动限流,返回429 Too Many Requests
  5. Exactly-Once投递: QdrantWriter的实现是at-least-once。如果写入Qdrant成功后,在更新lastPulledIndex之前崩溃,重启后会重复写入。实现exactly-once需要将lastPulledIndex与写入操作在Qdrant中原子地完成,或者使用幂等写入。

尽管存在这些局限,该架构成功地将一个脆弱的单点写入模式,转变为一个弹性的、高可用的摄入层。它通过牺牲数据写入的即时可见性,换取了系统入口的稳定性和数据持久性的强有力保证,这在许多数据密集型应用中是一种必要且明智的权衡。


  目录