基于Rust与Zookeeper构建动态可配置的分布式WAF核心


我们面临一个典型但棘手的运维问题:线上WAF(Web应用防火墙)集群的规则更新。传统的做法是修改配置文件,然后逐台滚动重启服务。在小规模部署下这还能接受,但在拥有数百个网关节点的环境中,这个过程不仅缓慢、风险高,而且短暂的服务中断或延迟都可能对业务造成影响。我们需要一个方案,能够将WAF规则近乎实时地、安全地推送到整个集群,且无需重启任何节点。

最初的构想是建立一个控制平面和数据平面的分离架构。数据平面需要极致的性能和内存安全,毕竟它处理的是所有入口流量,任何一个漏洞或性能瓶颈都将是灾难性的。控制平面则需要一个可靠的、能被所有数据平面节点监听的“真理之源”,以及一个供安全运营人员使用的简单管理界面。

技术选型决策很快就清晰了:

  1. 数据平面 - WAF核心引擎: Rust。选择Rust并非为了追赶潮流。在安全领域,C/C++长期是性能的首选,但内存安全问题始终是悬顶之剑。Rust通过其所有权和借用检查机制,在编译期就消除了这类风险,同时提供了与C/C++相媲美的性能。这对于一个处在流量入口、直接暴露在攻击面下的WAF核心来说,是决定性的优势。

  2. 配置分发与协调: Zookeeper。我们需要一个成熟、高可用的分布式协调服务。Zookeeper的Watcher机制是这个场景的完美匹配。数据平面节点可以监听(Watch)Zookeeper上的特定ZNode,一旦规则数据发生变更,Zookeeper会立即通知所有监听的节点。这正是我们实现“实时推送”的关键。虽然有etcd等更现代的替代品,但Zookeeper的稳定性和在大型分布式系统中的广泛应用使其成为一个可靠、低风险的选择。

  3. 管理界面: Flutter。我们需要一个简单的桌面客户端给运营团队使用,用来增删改查规则。Flutter能够快速构建跨平台(Windows, macOS, Linux)的桌面应用,开发效率高,UI表现力也不错。对于这样一个内部工具,Flutter是成本效益极佳的选择。

  4. 测试策略: 这是整个项目的基石。单元测试保证Rust核心逻辑的正确性;集成测试验证Rust节点与Zookeeper的交互;端到端测试则模拟从Flutter界面修改规则到Rust节点生效的全链路流程。

第一步:设计Rust WAF微内核

WAF的核心是规则引擎。我们需要设计一套可序列化的数据结构来定义规则,并在Rust引擎中高效地执行它们。

定义规则结构

我们先用 serde 来定义规则的数据结构,以便后续能用JSON格式在Zookeeper中存储和传输。

// src/rules.rs

use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// 规则匹配的目标位置
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum MatchTarget {
    Header,
    Query,
    Body,
    Uri,
}

/// 具体的匹配条件
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Condition {
    // 匹配目标,例如:Header, Query参数等
    pub target: MatchTarget,
    // 如果是Header或Query, 此字段指定具体的key
    #[serde(default)]
    pub key: Option<String>,
    // 匹配操作,这里简化为包含子串
    pub matcher: String,
}

/// 匹配成功后执行的动作
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum Action {
    Block,
    Log,
    Allow,
}

/// 单条WAF规则
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Rule {
    pub id: u32,
    pub description: String,
    pub conditions: Vec<Condition>,
    pub action: Action,
}

/// 规则集,这是我们将在Zookeeper中存储的完整对象
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RuleSet {
    pub version: u64,
    pub rules: Vec<Rule>,
}

impl RuleSet {
    /// 一个简单的辅助函数,用于将规则集序列化为JSON字符串
    pub fn to_json(&self) -> Result<String, serde_json::Error> {
        serde_json::to_string_pretty(self)
    }

    /// 从JSON字节流反序列化
    pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
        serde_json::from_slice(bytes)
    }
}

实现规则匹配引擎

引擎的核心职责是接收一个模拟的HTTP请求,并根据当前的规则集进行匹配。这里的关键在于如何管理规则集。规则的更新必须是原子的,并且不能阻塞正在处理请求的线程。Arc<RwLock<T>> 是解决这个问题的标准模式。

Arc(原子引用计数指针)允许多个线程安全地共享同一份数据。RwLock(读写锁)则允许多个读线程同时访问数据,但在写操作时会独占访问权。

// src/engine.rs

use crate::rules::{Action, Condition, MatchTarget, Rule, RuleSet};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use log::{info, warn};

// 模拟一个简化的HTTP请求
pub struct MockRequest<'a> {
    pub uri: &'a str,
    pub headers: HashMap<String, String>,
    pub query_params: HashMap<String, String>,
    pub body: &'a str,
}

pub struct WafEngine {
    // 使用Arc<RwLock<>>来安全地、原子地更新规则集
    rule_set: Arc<RwLock<RuleSet>>,
}

impl WafEngine {
    pub fn new() -> Self {
        Self {
            rule_set: Arc::new(RwLock::new(RuleSet::default())),
        }
    }

    /// 核心处理逻辑
    /// 接收一个请求,返回最终的WAF动作
    pub fn process_request(&self, request: &MockRequest) -> Action {
        // 获取规则集的读锁。这允许多个请求并发处理。
        // .read().unwrap() 在真实项目中应该有更健壮的错误处理,
        // 但对于RwLock,如果持有锁的线程panic,它会变成poisoned,unwrap会panic。
        // 在我们的场景下,读锁的持有时间极短,风险很低。
        let rules = self.rule_set.read().unwrap();

        for rule in &rules.rules {
            if self.is_match(request, rule) {
                info!("Request matched rule {}: '{}'. Action: {:?}", rule.id, rule.description, rule.action);
                // 一旦匹配到规则,立即返回动作
                return rule.action.clone();
            }
        }

        // 默认放行
        Action::Allow
    }
    
    /// 更新规则集。这是被Zookeeper回调调用的函数。
    pub fn update_rules(&self, new_rule_set: RuleSet) {
        // 获取写锁。此时所有尝试获取读锁或写锁的线程都会被阻塞。
        // 操作必须非常快,以避免阻塞处理请求的线程。
        let mut guard = self.rule_set.write().unwrap();
        info!("Updating rules to version {}. Previous version was {}.", new_rule_set.version, guard.version);
        // 直接替换内存中的整个规则集对象,这是一个原子操作。
        *guard = new_rule_set;
    }

    fn is_match(&self, request: &MockRequest, rule: &Rule) -> bool {
        // 所有条件都必须满足 (AND逻辑)
        rule.conditions.iter().all(|cond| self.check_condition(request, cond))
    }

    fn check_condition(&self, request: &MockRequest, condition: &Condition) -> bool {
        match condition.target {
            MatchTarget::Uri => request.uri.contains(&condition.matcher),
            MatchTarget::Body => request.body.contains(&condition.matcher),
            MatchTarget::Header => {
                if let Some(key) = &condition.key {
                    request.headers.get(key).map_or(false, |v| v.contains(&condition.matcher))
                } else {
                    // 如果没有指定key,则检查所有header的值
                    request.headers.values().any(|v| v.contains(&condition.matcher))
                }
            }
            MatchTarget::Query => {
                 if let Some(key) = &condition.key {
                    request.query_params.get(key).map_or(false, |v| v.contains(&condition.matcher))
                } else {
                    request.query_params.values().any(|v| v.contains(&condition.matcher))
                }
            }
        }
    }
}

这里的 update_rules 方法是关键。它获取一个写锁,然后用新的规则集替换掉旧的。这个操作本身非常快,因为它只是一个内存指针的替换。持有写锁的时间被缩至最短,最大限度地减少了对正常请求处理的影响。

第二步:集成Zookeeper实现动态更新

现在,我们需要让Rust应用连接到Zookeeper,并监听规则节点的变化。我们使用 zookeeper-async 这个库。

// src/coordinator.rs

use crate::engine::WafEngine;
use crate::rules::RuleSet;
use async_trait::async_trait;
use log::{error, info, warn};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use zookeeper_async::{Acl, CreateMode, WatchedEvent, Watcher, ZooKeeper};

const ZK_CONNECT_STR: &str = "127.0.0.1:2181";
const RULES_ZNODE_PATH: &str = "/waf/rules";

struct ZkWatcher {
    engine: Arc<WafEngine>,
}

// 实现Watcher trait,处理Zookeeper事件
#[async_trait]
impl Watcher for ZkWatcher {
    async fn handle(&self, event: WatchedEvent) {
        info!("Received Zookeeper event: {:?}", event);
        // 我们只关心节点数据变化事件
        if event.event_type == zookeeper_async::WatchedEventType::NodeDataChanged {
            if let Some(path) = event.path {
                if path == RULES_ZNODE_PATH {
                    // 这里我们不能直接在回调里执行阻塞或长时间的操作
                    // 最好是发送一个信号或消息到另一个任务去处理。
                    // 为简化示例,我们直接在这里重新获取和更新。
                    // 在真实项目中,会把 engine clone() 传给一个新spawn的task。
                    if let Err(e) = self.fetch_and_update_rules().await {
                         error!("Failed to fetch and update rules after notification: {}", e);
                    }
                }
            }
        }
    }
}

impl ZkWatcher {
    // 负责从Zookeeper获取最新规则并更新到WAF引擎
    async fn fetch_and_update_rules(&self) -> Result<(), Box<dyn std::error::Error>> {
        let zk = ZooKeeper::connect(ZK_CONNECT_STR, Duration::from_secs(15), self.clone()).await?;
        
        // 重新获取数据,并再次设置watch
        // watch是一次性的,每次触发后都需要重新设置
        match zk.get_data_w(RULES_ZNODE_PATH, self.clone()).await {
            Ok((bytes, _stat)) => {
                info!("Fetched {} bytes of new rule data from Zookeeper.", bytes.len());
                match RuleSet::from_slice(&bytes) {
                    Ok(rule_set) => {
                        self.engine.update_rules(rule_set);
                        info!("Successfully updated WAF rules.");
                    }
                    Err(e) => {
                        error!("Failed to deserialize rules from Zookeeper data: {}", e);
                    }
                }
            }
            Err(e) => {
                error!("Failed to get data from Zookeeper path '{}': {}", RULES_ZNODE_PATH, e);
            }
        }
        Ok(())
    }
}


pub async fn start_zookeeper_listener(engine: Arc<WafEngine>) {
    loop {
        info!("Attempting to connect to Zookeeper at {}", ZK_CONNECT_STR);
        let watcher = ZkWatcher {
            engine: engine.clone(),
        };

        match ZooKeeper::connect(ZK_CONNECT_STR, Duration::from_secs(15), watcher.clone()).await {
            Ok(zk) => {
                info!("Successfully connected to Zookeeper.");
                // 确保父节点存在
                if zk.exists("/waf", false).await.unwrap().is_none() {
                     zk.create("/waf", &[], Acl::open_unrestricted(), CreateMode::Persistent).await.unwrap();
                }

                // 首次启动时,尝试获取一次规则
                // 同时设置一个watch,以便在规则变更时收到通知
                match zk.get_data_w(RULES_ZNODE_PATH, watcher.clone()).await {
                    Ok((bytes, _stat)) => {
                        if let Ok(rule_set) = RuleSet::from_slice(&bytes) {
                            engine.update_rules(rule_set);
                            info!("Initial rules loaded successfully.");
                        }
                    }
                    Err(zookeeper_async::ZkError::NoNode) => {
                        warn!("Rules node '{}' does not exist yet. Waiting for creation.", RULES_ZNODE_PATH);
                        // 如果节点不存在,也需要设置一个存在的watch来监听创建事件
                        if zk.exists_w(RULES_ZNODE_PATH, watcher).await.is_err() {
                            error!("Failed to set exists watch on rules node.");
                        }
                    }
                    Err(e) => {
                        error!("Failed to perform initial fetch from Zookeeper: {}", e);
                    }
                }
                
                // 连接是会话制的,如果断开,上面的代码会返回错误,外部循环会处理重连
                // 我们需要一个方式来保持这个函数存活,直到会话过期
                let (tx, mut rx) = tokio::sync::mpsc::channel(1);
                let _zk_clone = zk.clone();
                tokio::spawn(async move {
                    // 这个任务在zk会话关闭时结束
                    _zk_clone.on_close().await;
                    let _ = tx.send(()).await;
                });
                rx.recv().await;
                error!("Zookeeper session expired. Reconnecting...");
            }
            Err(e) => {
                error!("Failed to connect to Zookeeper: {}. Retrying in 5 seconds.", e);
            }
        }
        sleep(Duration::from_secs(5)).await;
    }
}

// main.rs
#[tokio::main]
async fn main() {
    // 设置日志
    env_logger::init();
    
    let engine = Arc::new(WafEngine::new());
    
    // 在一个独立的task中运行Zookeeper监听器
    let engine_clone = engine.clone();
    tokio::spawn(async move {
        coordinator::start_zookeeper_listener(engine_clone).await;
    });

    // 这里可以启动一个HTTP服务器来接收真实流量
    // 为简化,我们只模拟请求来测试
    info!("WAF Engine started. Waiting for rules from Zookeeper...");

    // 模拟一个web服务器的循环,每隔几秒处理一个请求
    loop {
        sleep(Duration::from_secs(5)).await;
        let mut headers = HashMap::new();
        headers.insert("user-agent".to_string(), "Test-Client".to_string());
        headers.insert("x-malicious-header".to_string(), "bad-content-123".to_string());
        
        let mut query_params = HashMap::new();
        query_params.insert("id".to_string(), "100".to_string());
        query_params.insert("payload".to_string(), "some-sql-injection".to_string());
        
        let req = MockRequest {
            uri: "/test/path",
            headers,
            query_params,
            body: "{\"username\": \"admin\"}"
        };

        info!("Processing a mock request...");
        let action = engine.process_request(&req);
        info!("Request processed. Action: {:?}", action);
    }
}

这段代码建立了一个健壮的循环,用于连接Zookeeper。一旦连接成功,它会立即尝试获取一次规则,并设置一个watch。当连接断开时,循环会等待5秒后自动重连。Watcher的回调逻辑是核心:当收到数据变更通知后,它会重新获取数据,反序列化成 RuleSet,然后调用 WafEngineupdate_rules 方法。

整个系统的架构如下:

graph TD
    subgraph Control Plane
        A[Flutter Admin UI] -- 1. Operator edits rules --> B{Zookeeper Cluster};
    end
    
    B -- 2. Writes rules to /waf/rules ZNode --> B;
    B -- 3. Notifies watchers --> C1[Rust WAF Node 1];
    B -- 3. Notifies watchers --> C2[Rust WAF Node 2];
    B -- 3. Notifies watchers --> C3[Rust WAF Node N];

    subgraph Data Plane
        C1 -- 4. Fetches new rules --> B;
        C2 -- 4. Fetches new rules --> B;
        C3 -- 4. Fetches new rules --> B;
        
        C1 -- 5. Atomically updates in-memory rules --> C1;
        C2 -- 5. Atomically updates in-memory rules --> C2;
        C3 -- 5. Atomically updates in-memory rules --> C3;
    end
    
    subgraph Traffic Flow
        D[Incoming Traffic] --> E{Load Balancer};
        E --> C1;
        E --> C2;
        E --> C3;
    end

第三步:Flutter管理端

Flutter部分相对直接。我们需要一个界面来管理规则列表,并将它们序列化成JSON推送到Zookeeper。在生产项目中,我们会使用一个原生的Zookeeper Dart库,但为了演示,我们可以通过调用 zkCli.sh 命令行工具来简化这个过程,这在内部工具中是一个可行的权衡。

// lib/rule_manager.dart (伪代码/核心逻辑)
import 'dart:convert';
import 'dart:io';

// Rule, Condition等数据结构需要和Rust侧对应
class Rule {
  int id;
  String description;
  List<Condition> conditions;
  String action; // "Block", "Log", "Allow"
  
  // ... constructors and toJson method
}

class RuleSet {
  int version;
  List<Rule> rules;
  
  Map<String, dynamic> toJson() => {
    'version': version,
    'rules': rules.map((r) => r.toJson()).toList(),
  };
}


class ZookeeperService {
  final String zkCliPath; // e.g., "/path/to/zookeeper/bin/zkCli.sh"
  final String zkServers; // e.g., "127.0.0.1:2181"
  final String rulesNodePath = "/waf/rules";

  ZookeeperService(this.zkCliPath, this.zkServers);

  Future<void> pushRules(RuleSet ruleSet) async {
    // 每次推送都更新版本号
    ruleSet.version = DateTime.now().millisecondsSinceEpoch;
    
    String jsonData = jsonEncode(ruleSet.toJson());

    // 在一个临时文件中写入数据,因为zkCli的set命令对数据长度有限制且处理多行字符串不便
    final tempFile = File('temp_rules.json');
    await tempFile.writeAsString(jsonData);

    // 检查节点是否存在,不存在则创建
    var checkResult = await Process.run(zkCliPath, ['-server', zkServers, 'ls', '/waf']);
    if (!checkResult.stdout.toString().contains('rules')) {
        await Process.run(zkCliPath, ['-server', zkServers, 'create', rulesNodePath, '""']);
    }

    // 使用 set 命令更新 ZNode 的数据
    // 注意:在生产环境中,直接调用命令行工具存在安全和稳定性风险
    // 应该使用原生库进行交互
    var result = await Process.run(
      zkCliPath,
      ['-server', zkServers, 'set', rulesNodePath, await tempFile.readAsString()],
    );

    await tempFile.delete();

    if (result.exitCode != 0) {
      throw Exception("Failed to push rules to Zookeeper: ${result.stderr}");
    }

    print("Successfully pushed rules to Zookeeper.");
  }
}

// 在Flutter的UI部分,你会有一个状态管理对象持有 List<Rule>
// 当用户点击 "Save and Push" 按钮时,调用:
//
// final ruleSet = RuleSet(version: 0, rules: currentRules);
// await zookeeperService.pushRules(ruleSet);

这个Flutter应用就是一个简单的CRUD界面,核心功能是 pushRules 方法。它将当前的规则列表打包成一个 RuleSet 对象,序列化为JSON,然后通过 zkCli.sh 更新到Zookeeper的 /waf/rules 节点。

第四步:测试,测试,还是测试

一个分布式系统的稳定性取决于其最薄弱的环节。

  1. Rust单元测试: 针对 WafEngine 的匹配逻辑编写详尽的单元测试是基础。

    ```rust
    // tests/engine_test.rs
    #[cfg(test)]
    mod tests {

     use super::*; // import from engine.rs
     
     #[test]
     fn test_block_on_malicious_header() {
         let engine = WafEngine::new();
         let rule = Rule {
             id: 1,
             description: "Block malicious header".to_string(),
             conditions: vec
    

  目录