系统入口的写入请求,如果直接透传到后端的有状态服务,尤其像Qdrant这样的向量数据库,会面临一个直接的脆弱性:数据库的短暂抖动、网络分区或写入压力过大,都会直接导致请求失败。客户端被迫实现复杂的重试逻辑,而系统整体的写入可用性则完全取决于数据库的瞬时状态。在生产环境中,这种设计是不可接受的。
我们需要在网关层引入一个具备持久化能力的缓冲层。常规方案是引入Kafka或Pulsar,但这对于某些场景而言过于笨重。我们的目标是构建一个轻量级、高可用的摄入网关,它能对写入请求进行缓冲和复制,确保数据在进入Qdrant之前绝不丢失,同时为客户端提供一个稳定且高吞吐的入口。
我们将使用Scala和Akka Typed Actor模型,从头实现一个基于In-Sync Replicas (ISR)概念的轻量级分布式日志服务,并将其集成到一个代理网关中,专门负责向Qdrant的向量集合进行高可用写入。这个网关的核心职责是:接收写入请求,将请求持久化到内部的复制日志中,一旦日志在ISR节点间达成一致,立即向客户端返回成功,然后异步地、可靠地将数据批量推送到Qdrant。
架构设计:解耦写入确认与最终持久化
整体架构围绕一个核心目标:将客户端的写入确认(ack)与Qdrant的实际写入操作解耦。
sequenceDiagram participant Client participant IngestionGateway (Scala/Akka HTTP) participant ReplicationCoordinator (Leader Actor) participant LogFollower1 (Follower Actor) participant LogFollower2 (Follower Actor) participant QdrantWriter (Actor) participant Qdrant Client->>+IngestionGateway: POST /v1/collections/my_collection/points (vector data) IngestionGateway->>+ReplicationCoordinator: WriteOp(id, payload) ReplicationCoordinator->>ReplicationCoordinator: Append to own log (offset: N) ReplicationCoordinator->>+LogFollower1: Replicate(offset: N, payload) ReplicationCoordinator->>+LogFollower2: Replicate(offset: N, payload) LogFollower1-->>-ReplicationCoordinator: Ack(offset: N) Note right of ReplicationCoordinator: ISR set = {Self, Follower1}. Follower2 is lagging. ReplicationCoordinator-->>-IngestionGateway: WriteConfirmed(id) IngestionGateway-->>-Client: HTTP 202 Accepted LogFollower2-->>-ReplicationCoordinator: Ack(offset: N) Note right of ReplicationCoordinator: Follower2 catches up, added to ISR. QdrantWriter->>ReplicationCoordinator: PullCommittedLogs ReplicationCoordinator-->>QdrantWriter: [Op(N), Op(N+1), ...] QdrantWriter->>+Qdrant: Upsert Points (batch) Qdrant-->>-QdrantWriter: Success
这个流程的关键点在于:IngestionGateway
向客户端返回202Accepted
的时刻,是基于ReplicationCoordinator
的确认,而非Qdrant
。这个确认的条件是“写入操作已被ISR集合中的所有副本确认”,这为我们提供了可配置的数据持久性保证。
核心组件实现:Akka Typed Actor
我们将使用Akka Typed来构建这个并发系统。Actor模型天然适合这种需要管理状态、处理消息和容错的分布式组件。
1. Actor协议定义 (Protocol.scala)
首先定义所有参与者之间的通信契约。这是构建强类型Actor系统的第一步。
import akka.actor.typed.ActorRef
object ReplicationProtocol {
// Common trait for all commands
sealed trait Command
// Client-facing command sent to the coordinator
final case class Write(payload: String, replyTo: ActorRef[WriteResult]) extends Command
sealed trait WriteResult
final case class WriteSuccess(correlationId: Long) extends WriteResult
final case class WriteFailure(correlationId: Long, reason: String) extends WriteResult
// Internal replication commands
private[ReplicationProtocol] final case class Replicate(logId: Long, payload: String) extends Command
private[ReplicationProtocol] final case class ReplicationAck(logId: Long, from: ActorRef[Command]) extends Command
// Follower management
private[ReplicationProtocol] final case class RegisterFollower(follower: ActorRef[Command]) extends Command
private[ReplicationProtocol] final case class FollowerHeartbeat(from: ActorRef[Command]) extends Command
private[ReplicationProtocol] case object CheckHeartbeatTimeout extends Command
// Data pulling for Qdrant writer
final case class PullCommittedLogs(replyTo: ActorRef[CommittedLogs]) extends Command
final case class CommittedLogs(logs: Seq[(Long, String)])
}
2. 复制协调器 (ReplicationCoordinator.scala)
这是系统的核心,扮演Leader角色。它负责接收客户端写入、向Follower复制日志、维护ISR集合以及管理已提交的日志偏移量。
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors, TimerScheduler}
import scala.collection.mutable
import scala.concurrent.duration._
object ReplicationCoordinator {
import ReplicationProtocol._
def apply(minIsr: Int): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
new ReplicationCoordinator(context, timers, minIsr)
}
}
}
class ReplicationCoordinator(
context: ActorContext[ReplicationProtocol.Command],
timers: TimerScheduler[ReplicationProtocol.Command],
minIsr: Int
) extends AbstractBehavior[ReplicationProtocol.Command](context) {
import ReplicationProtocol._
// State of the leader
private val log = mutable.ArrayBuffer.empty[(Long, String)]
private var nextLogId: Long = 0L
private var followers = Set.empty[ActorRef[Command]]
private var inSyncReplicas = Set.empty[ActorRef[Command]]
// Tracking pending replications for each log entry
private val pendingAcks = mutable.Map.empty[Long, (Set[ActorRef[Command]], ActorRef[WriteResult])]
// High-water mark: log entries up to this ID are committed
private var commitIndex: Long = -1L
// For follower health checks
private val lastHeartbeats = mutable.Map.empty[ActorRef[Command], Long]
private val heartbeatTimeout = 5.seconds
private val heartbeatCheckKey = "heartbeat-check"
// Add self to ISR set
inSyncReplicas += context.self
timers.startTimerAtFixedRate(heartbeatCheckKey, CheckHeartbeatTimeout, heartbeatTimeout)
override def onMessage(msg: Command): Behavior[Command] = {
msg match {
case Write(payload, replyTo) =>
handleWrite(payload, replyTo)
this
case ReplicationAck(logId, from) =>
handleReplicationAck(logId, from)
this
case RegisterFollower(follower) =>
context.log.info(s"Registering new follower: ${follower.path.name}")
followers += follower
lastHeartbeats(follower) = System.currentTimeMillis()
// A new follower is not in sync until it catches up
this
case FollowerHeartbeat(from) =>
if (followers.contains(from)) {
lastHeartbeats(from) = System.currentTimeMillis()
// If a follower was out of sync, this could be a trigger to try and catch it up
// For simplicity, we only add it back to ISR on an actual log ACK.
}
this
case CheckHeartbeatTimeout =>
checkFollowerHealth()
this
case PullCommittedLogs(replyTo) =>
// Only return logs that are safely committed
val logsToReturn = log.slice(0, (commitIndex + 1).toInt).toSeq
replyTo ! CommittedLogs(logsToReturn)
this
}
}
private def handleWrite(payload: String, replyTo: ActorRef[WriteResult]): Unit = {
val currentLogId = nextLogId
log.append((currentLogId, payload))
context.log.info(s"Received write for log $currentLogId. Replicating to ISR: ${inSyncReplicas.size - 1} followers.")
// Track which ISR members need to acknowledge this write
pendingAcks(currentLogId) = (inSyncReplicas, replyTo)
// Replicate to followers that are currently in sync
val replicateMsg = Replicate(currentLogId, payload)
inSyncReplicas.filter(_ != context.self).foreach(_ ! replicateMsg)
// The leader ACKs its own write immediately
handleReplicationAck(currentLogId, context.self)
nextLogId += 1
}
private def handleReplicationAck(logId: Long, from: ActorRef[Command]): Unit = {
pendingAcks.get(logId) match {
case Some((pending, replyTo)) =>
val updatedPending = pending - from
// A follower that was lagging might catch up and ACK. If so, add it back to ISR.
if (followers.contains(from) && !inSyncReplicas.contains(from)) {
context.log.info(s"Follower ${from.path.name} caught up at log $logId. Adding to ISR.")
inSyncReplicas += from
}
if (updatedPending.isEmpty) {
// All replicas in the ISR set at the time of writing have acknowledged.
context.log.info(s"Log $logId fully replicated within ISR set.")
pendingAcks.remove(logId)
// Advance the commit index. This is crucial.
updateCommitIndex()
// Now we can reply to the original client
replyTo ! WriteSuccess(logId)
} else {
// Update the pending set
pendingAcks(logId) = (updatedPending, replyTo)
}
case None =>
// This could be a late ACK for an already committed log, which is fine.
// Or an ack from a follower that was not in ISR, which is also fine.
}
}
private def updateCommitIndex(): Unit = {
// The commit index is the highest log ID that has been acked by all replicas in the current ISR set.
// This is a simplified model. A more robust implementation would track acknowledgements per replica.
// Here, we advance commitIndex to the highest consecutively successful log write.
while (commitIndex + 1 < nextLogId && !pendingAcks.contains(commitIndex + 1)) {
commitIndex += 1
}
context.log.info(s"Commit index advanced to $commitIndex")
}
private def checkFollowerHealth(): Unit = {
val now = System.currentTimeMillis()
val deadFollowers = followers.filter { f =>
now - lastHeartbeats.getOrElse(f, now) > heartbeatTimeout.toMillis
}
if (deadFollowers.nonEmpty) {
context.log.warn(s"Followers timed out and removed from ISR: $deadFollowers")
followers --= deadFollowers
inSyncReplicas --= deadFollowers
lastHeartbeats --= deadFollowers
// If ISR size drops below minimum, we can no longer guarantee durability.
// In a real system, this should trigger an alarm or even halt new writes.
if (inSyncReplicas.size < minIsr) {
context.log.error(s"CRITICAL: ISR size (${inSyncReplicas.size}) is below minimum required ($minIsr). Write durability is at risk.")
// Here we could implement a "circuit breaker" logic to reject new writes.
// For now, we just log an error.
}
}
}
}
代码解析:
- 状态管理:
log
,followers
,inSyncReplicas
,pendingAcks
,commitIndex
构成了Leader的全部状态。 - ISR维护:
checkFollowerHealth
是ISR管理的核心。它定期检查心跳,将超时的Follower从followers
和inSyncReplicas
集合中移除。如果inSyncReplicas
的数量低于配置的minIsr
,系统进入危险状态。 - 写入流程:
handleWrite
接收请求,将日志追加到本地,然后向当前ISR集合中的所有Follower广播Replicate
消息。它通过pendingAcks
来跟踪每个日志条目的确认状态。 - 确认与提交:
handleReplicationAck
在收到Follower的确认后,更新pendingAcks
。当一个日志条目的所有待确认副本都清空后,代表该日志在ISR内部达成一致。此时,我们才能安全地向客户端确认写入,并尝试推进commitIndex
。 - 数据可见性:
PullCommittedLogs
接口只返回commitIndex
之前的数据,确保下游消费者(QdrantWriter
)不会读取到尚未被完全复制的数据。
3. 日志跟随者 (LogFollower.scala)
Follower的角色相对简单:接收日志、存储,并发送心跳和ACK。
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors, TimerScheduler}
import scala.collection.mutable
import scala.concurrent.duration._
object LogFollower {
import ReplicationProtocol._
def apply(coordinator: ActorRef[Command]): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
new LogFollower(context, timers, coordinator)
}
}
}
}
class LogFollower(
context: ActorContext[ReplicationProtocol.Command],
timers: TimerScheduler[ReplicationProtocol.Command],
coordinator: ActorRef[ReplicationProtocol.Command]
) extends AbstractBehavior[ReplicationProtocol.Command](context) {
import ReplicationProtocol._
private val log = mutable.Map.empty[Long, String]
private var nextExpectedLogId: Long = 0L
private val heartbeatInterval = 2.seconds
private val heartbeatKey = "follower-heartbeat"
override def onMessage(msg: Command): Behavior[Command] = {
msg match {
case Replicate(logId, payload) =>
// For simplicity, we assume messages arrive in order.
// A production system would need to handle out-of-order messages, gaps, etc.
if (logId == nextExpectedLogId) {
context.log.info(s"Replicating log $logId")
log(logId) = payload
nextExpectedLogId += 1
coordinator ! ReplicationAck(logId, context.self)
} else {
context.log.warn(s"Received out-of-order log. Expected $nextExpectedLogId, but got $logId. Ignoring.")
// In a real scenario, the follower should request missing logs from the leader.
}
this
case _ => // Followers ignore other messages
}
this
}
private def sendHeartbeat(): Unit = {
coordinator ! FollowerHeartbeat(context.self)
}
// Initial registration and start heartbeating
coordinator ! RegisterFollower(context.self)
timers.startTimerAtFixedRate(heartbeatKey, FollowerHeartbeat(context.self), heartbeatInterval)
Behaviors.same
}
4. Qdrant写入器 (QdrantWriter.scala)
这是一个独立的Actor,定期从ReplicationCoordinator
拉取已提交的日志,并批量写入Qdrant。这种模式将数据库写入的压力和不确定性与主写入路径隔离开。
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors, TimerScheduler}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream.scaladsl.Source
import akka.util.ByteString
import spray.json._ // Assuming spray-json for JSON handling
import scala.concurrent.duration._
import scala.util.{Failure, Success}
// A simple representation of Qdrant point for JSON serialization
case class QdrantPoint(id: String, vector: List[Float], payload: Map[String, String])
case class UpsertRequest(points: Seq[QdrantPoint])
object MyJsonProtocol extends DefaultJsonProtocol {
implicit val qdrantPointFormat: RootJsonFormat[QdrantPoint] = jsonFormat3(QdrantPoint)
implicit val upsertRequestFormat: RootJsonFormat[UpsertRequest] = jsonFormat1(UpsertRequest)
}
object QdrantWriter {
import ReplicationProtocol._
sealed trait QdrantWriterCommand
private case object Pull extends QdrantWriterCommand
private final case class WrappedCommittedLogs(logs: CommittedLogs) extends QdrantWriterCommand
def apply(
coordinator: ActorRef[Command],
qdrantUri: Uri,
qdrantApiKey: String,
collectionName: String,
batchSize: Int
): Behavior[QdrantWriterCommand] =
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
new QdrantWriter(context, timers, coordinator, qdrantUri, qdrantApiKey, collectionName, batchSize)
}
}
}
class QdrantWriter(
context: ActorContext[QdrantWriter.QdrantWriterCommand],
timers: TimerScheduler[QdrantWriter.QdrantWriterCommand],
coordinator: ActorRef[ReplicationProtocol.Command],
qdrantUri: Uri,
qdrantApiKey: String,
collectionName: String,
batchSize: Int
) extends AbstractBehavior[QdrantWriter.QdrantWriterCommand](context) {
import QdrantWriter._
import ReplicationProtocol._
import MyJsonProtocol._
import context.executionContext
private var lastPulledIndex: Long = -1L
private val http = Http(context.system)
private val committedLogsAdapter: ActorRef[CommittedLogs] = context.messageAdapter(WrappedCommittedLogs.apply)
timers.startTimerAtFixedRate("qdrant-pull-timer", Pull, 5.seconds)
override def onMessage(msg: QdrantWriterCommand): Behavior[QdrantWriterCommand] = {
msg match {
case Pull =>
coordinator ! PullCommittedLogs(committedLogsAdapter)
this
case WrappedCommittedLogs(CommittedLogs(logs)) =>
val newLogs = logs.filter(_._1 > lastPulledIndex)
if (newLogs.nonEmpty) {
context.log.info(s"Pulled ${newLogs.size} new committed logs to write to Qdrant.")
processLogs(newLogs)
lastPulledIndex = newLogs.map(_._1).max
}
this
}
}
private def processLogs(logs: Seq[(Long, String)]): Unit = {
logs.grouped(batchSize).foreach { batch =>
// Here we assume the payload is a JSON that can be parsed into QdrantPoint
// In a real system, payload parsing requires robust error handling.
val points = batch.flatMap { case (id, payload) =>
// This parsing is fragile and for demonstration only.
try {
// Assuming payload is "vector_json|payload_json"
val parts = payload.split('|')
val vector = parts(0).parseJson.convertTo[List[Float]]
val metadata = parts(1).parseJson.convertTo[Map[String, String]]
Some(QdrantPoint(id.toString, vector, metadata))
} catch {
case e: Exception =>
context.log.error(s"Failed to parse log entry $id. Skipping. Error: ${e.getMessage}")
None // Skip malformed data
}
}
if (points.nonEmpty) {
val requestEntity = HttpEntity(
ContentTypes.`application/json`,
UpsertRequest(points).toJson.compactPrint
)
val targetUri = qdrantUri.withPath(Uri.Path(s"/collections/$collectionName/points"))
val request = HttpRequest(
method = HttpMethods.PUT,
uri = targetUri.withQuery(Uri.Query("wait" -> "true")), // wait for indexing
headers = List(RawHeader("api-key", qdrantApiKey)),
entity = requestEntity
)
http.singleRequest(request).onComplete {
case Success(res) if res.status.isSuccess() =>
context.log.info(s"Successfully upserted batch of ${points.size} points to Qdrant.")
res.entity.discardBytes()
case Success(res) =>
context.log.error(s"Failed to upsert batch to Qdrant. Status: ${res.status}. Response: ${res.entity.toString}")
// Implement retry or dead-letter queue logic here.
res.entity.discardBytes()
case Failure(ex) =>
context.log.error("HTTP request to Qdrant failed.", ex)
// Implement retry logic here.
}
}
}
}
}
代码解析:
- 解耦:
QdrantWriter
完全独立运行,通过定时拉取Pull
消息与ReplicationCoordinator
通信。 - 批量处理: 它将日志分批(
batchSize
),以提高写入Qdrant的效率。 - 健壮性: 在生产代码中,
processLogs
中的HTTP请求部分必须包含更复杂的错误处理,例如指数退避重试、持久化失败的日志到死信队列等。这里的实现仅为演示。 - 数据格式: 我们假设日志的
payload
是一个可解析为向量和元数据的字符串。实际项目中,这里会使用Protobuf或Avro等更高效的序列化格式。
网关入口与系统组装
最后,我们使用Akka HTTP构建网关的HTTP服务入口,并将所有Actor组装起来。
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.io.StdIn
object IngestionGatewayApp {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load("application.conf")
val minIsr = config.getInt("replication.min-isr")
val qdrantHost = config.getString("qdrant.host")
val qdrantPort = config.getInt("qdrant.port")
val qdrantApiKey = config.getString("qdrant.api-key")
val collectionName = config.getString("qdrant.collection-name")
val rootBehavior = Behaviors.setup[Nothing] { context =>
val coordinator = context.spawn(ReplicationCoordinator(minIsr), "ReplicationCoordinator")
// Spawn a few followers for demonstration
context.spawn(LogFollower(coordinator), "Follower-1")
context.spawn(LogFollower(coordinator), "Follower-2")
val qdrantUri = Uri(s"http://$qdrantHost:$qdrantPort")
context.spawn(QdrantWriter(coordinator, qdrantUri, qdrantApiKey, collectionName, 100), "QdrantWriter")
startHttpServer(coordinator)(context.system)
Behaviors.empty
}
val system = ActorSystem[Nothing](rootBehavior, "IngestionGatewaySystem")
}
def startHttpServer(coordinator: ActorRef[ReplicationProtocol.Command])(implicit system: ActorSystem[_]): Unit = {
import ReplicationProtocol._
implicit val timeout: Timeout = 3.seconds
import system.executionContext
val route =
path("ingest") {
post {
entity(as[String]) { payload =>
val result: Future[WriteResult] = coordinator.ask(ref => Write(payload, ref))
onComplete(result) {
case Success(WriteSuccess(logId)) =>
complete(StatusCodes.Accepted, s"Write accepted with logId: $logId")
case Success(WriteFailure(_, reason)) =>
complete(StatusCodes.InternalServerError, s"Write failed: $reason")
case Failure(ex) =>
complete(StatusCodes.ServiceUnavailable, s"Request timed out: ${ex.getMessage}")
}
}
}
}
val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}
}
配置文件 (application.conf):
replication {
min-isr = 2 # Leader + at least one follower must ACK
}
qdrant {
host = "localhost"
port = 6333
api-key = "your-qdrant-api-key" # Can be empty if not set
collection-name = "my_test_collection"
}
当前方案的局限性与未来展望
这个实现虽然展示了核心思想,但在生产环境中仍有诸多不足:
- 无Leader选举: 当前
ReplicationCoordinator
是一个单点。如果它崩溃,整个写入服务将不可用。一个完整的方案需要基于Raft或Paxos实现自动的Leader选举,Akka Cluster Sharding和Singleton可以作为实现此功能的基础。 - 日志持久化: 所有日志都存储在Actor的内存中。重启将导致数据丢失。需要引入磁盘持久化,例如使用Akka Persistence,将事件日志存储到RocksDB或Cassandra等后端。
- Follower追赶逻辑: Follower的实现非常初级,无法处理乱序或丢失的
Replicate
消息。它需要一个更复杂的机制来从Leader处拉取缺失的日志以完成追赶。 - 背压与流控: 如果写入速度远超Qdrant的处理能力,内存中的日志会无限增长。需要实现背压机制,当
commitIndex
与nextLogId
差距过大时,网关应主动限流,返回429 Too Many Requests
。 - Exactly-Once投递:
QdrantWriter
的实现是at-least-once。如果写入Qdrant成功后,在更新lastPulledIndex
之前崩溃,重启后会重复写入。实现exactly-once需要将lastPulledIndex
与写入操作在Qdrant中原子地完成,或者使用幂等写入。
尽管存在这些局限,该架构成功地将一个脆弱的单点写入模式,转变为一个弹性的、高可用的摄入层。它通过牺牲数据写入的即时可见性,换取了系统入口的稳定性和数据持久性的强有力保证,这在许多数据密集型应用中是一种必要且明智的权衡。