我们面临一个典型但棘手的运维问题:线上WAF(Web应用防火墙)集群的规则更新。传统的做法是修改配置文件,然后逐台滚动重启服务。在小规模部署下这还能接受,但在拥有数百个网关节点的环境中,这个过程不仅缓慢、风险高,而且短暂的服务中断或延迟都可能对业务造成影响。我们需要一个方案,能够将WAF规则近乎实时地、安全地推送到整个集群,且无需重启任何节点。
最初的构想是建立一个控制平面和数据平面的分离架构。数据平面需要极致的性能和内存安全,毕竟它处理的是所有入口流量,任何一个漏洞或性能瓶颈都将是灾难性的。控制平面则需要一个可靠的、能被所有数据平面节点监听的“真理之源”,以及一个供安全运营人员使用的简单管理界面。
技术选型决策很快就清晰了:
数据平面 - WAF核心引擎: Rust。选择Rust并非为了追赶潮流。在安全领域,C/C++长期是性能的首选,但内存安全问题始终是悬顶之剑。Rust通过其所有权和借用检查机制,在编译期就消除了这类风险,同时提供了与C/C++相媲美的性能。这对于一个处在流量入口、直接暴露在攻击面下的WAF核心来说,是决定性的优势。
配置分发与协调: Zookeeper。我们需要一个成熟、高可用的分布式协调服务。Zookeeper的Watcher机制是这个场景的完美匹配。数据平面节点可以监听(Watch)Zookeeper上的特定ZNode,一旦规则数据发生变更,Zookeeper会立即通知所有监听的节点。这正是我们实现“实时推送”的关键。虽然有etcd等更现代的替代品,但Zookeeper的稳定性和在大型分布式系统中的广泛应用使其成为一个可靠、低风险的选择。
管理界面: Flutter。我们需要一个简单的桌面客户端给运营团队使用,用来增删改查规则。Flutter能够快速构建跨平台(Windows, macOS, Linux)的桌面应用,开发效率高,UI表现力也不错。对于这样一个内部工具,Flutter是成本效益极佳的选择。
测试策略: 这是整个项目的基石。单元测试保证Rust核心逻辑的正确性;集成测试验证Rust节点与Zookeeper的交互;端到端测试则模拟从Flutter界面修改规则到Rust节点生效的全链路流程。
第一步:设计Rust WAF微内核
WAF的核心是规则引擎。我们需要设计一套可序列化的数据结构来定义规则,并在Rust引擎中高效地执行它们。
定义规则结构
我们先用 serde
来定义规则的数据结构,以便后续能用JSON格式在Zookeeper中存储和传输。
// src/rules.rs
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// 规则匹配的目标位置
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum MatchTarget {
Header,
Query,
Body,
Uri,
}
/// 具体的匹配条件
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Condition {
// 匹配目标,例如:Header, Query参数等
pub target: MatchTarget,
// 如果是Header或Query, 此字段指定具体的key
#[serde(default)]
pub key: Option<String>,
// 匹配操作,这里简化为包含子串
pub matcher: String,
}
/// 匹配成功后执行的动作
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum Action {
Block,
Log,
Allow,
}
/// 单条WAF规则
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Rule {
pub id: u32,
pub description: String,
pub conditions: Vec<Condition>,
pub action: Action,
}
/// 规则集,这是我们将在Zookeeper中存储的完整对象
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RuleSet {
pub version: u64,
pub rules: Vec<Rule>,
}
impl RuleSet {
/// 一个简单的辅助函数,用于将规则集序列化为JSON字符串
pub fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
/// 从JSON字节流反序列化
pub fn from_slice(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes)
}
}
实现规则匹配引擎
引擎的核心职责是接收一个模拟的HTTP请求,并根据当前的规则集进行匹配。这里的关键在于如何管理规则集。规则的更新必须是原子的,并且不能阻塞正在处理请求的线程。Arc<RwLock<T>>
是解决这个问题的标准模式。
Arc
(原子引用计数指针)允许多个线程安全地共享同一份数据。RwLock
(读写锁)则允许多个读线程同时访问数据,但在写操作时会独占访问权。
// src/engine.rs
use crate::rules::{Action, Condition, MatchTarget, Rule, RuleSet};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use log::{info, warn};
// 模拟一个简化的HTTP请求
pub struct MockRequest<'a> {
pub uri: &'a str,
pub headers: HashMap<String, String>,
pub query_params: HashMap<String, String>,
pub body: &'a str,
}
pub struct WafEngine {
// 使用Arc<RwLock<>>来安全地、原子地更新规则集
rule_set: Arc<RwLock<RuleSet>>,
}
impl WafEngine {
pub fn new() -> Self {
Self {
rule_set: Arc::new(RwLock::new(RuleSet::default())),
}
}
/// 核心处理逻辑
/// 接收一个请求,返回最终的WAF动作
pub fn process_request(&self, request: &MockRequest) -> Action {
// 获取规则集的读锁。这允许多个请求并发处理。
// .read().unwrap() 在真实项目中应该有更健壮的错误处理,
// 但对于RwLock,如果持有锁的线程panic,它会变成poisoned,unwrap会panic。
// 在我们的场景下,读锁的持有时间极短,风险很低。
let rules = self.rule_set.read().unwrap();
for rule in &rules.rules {
if self.is_match(request, rule) {
info!("Request matched rule {}: '{}'. Action: {:?}", rule.id, rule.description, rule.action);
// 一旦匹配到规则,立即返回动作
return rule.action.clone();
}
}
// 默认放行
Action::Allow
}
/// 更新规则集。这是被Zookeeper回调调用的函数。
pub fn update_rules(&self, new_rule_set: RuleSet) {
// 获取写锁。此时所有尝试获取读锁或写锁的线程都会被阻塞。
// 操作必须非常快,以避免阻塞处理请求的线程。
let mut guard = self.rule_set.write().unwrap();
info!("Updating rules to version {}. Previous version was {}.", new_rule_set.version, guard.version);
// 直接替换内存中的整个规则集对象,这是一个原子操作。
*guard = new_rule_set;
}
fn is_match(&self, request: &MockRequest, rule: &Rule) -> bool {
// 所有条件都必须满足 (AND逻辑)
rule.conditions.iter().all(|cond| self.check_condition(request, cond))
}
fn check_condition(&self, request: &MockRequest, condition: &Condition) -> bool {
match condition.target {
MatchTarget::Uri => request.uri.contains(&condition.matcher),
MatchTarget::Body => request.body.contains(&condition.matcher),
MatchTarget::Header => {
if let Some(key) = &condition.key {
request.headers.get(key).map_or(false, |v| v.contains(&condition.matcher))
} else {
// 如果没有指定key,则检查所有header的值
request.headers.values().any(|v| v.contains(&condition.matcher))
}
}
MatchTarget::Query => {
if let Some(key) = &condition.key {
request.query_params.get(key).map_or(false, |v| v.contains(&condition.matcher))
} else {
request.query_params.values().any(|v| v.contains(&condition.matcher))
}
}
}
}
}
这里的 update_rules
方法是关键。它获取一个写锁,然后用新的规则集替换掉旧的。这个操作本身非常快,因为它只是一个内存指针的替换。持有写锁的时间被缩至最短,最大限度地减少了对正常请求处理的影响。
第二步:集成Zookeeper实现动态更新
现在,我们需要让Rust应用连接到Zookeeper,并监听规则节点的变化。我们使用 zookeeper-async
这个库。
// src/coordinator.rs
use crate::engine::WafEngine;
use crate::rules::RuleSet;
use async_trait::async_trait;
use log::{error, info, warn};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use zookeeper_async::{Acl, CreateMode, WatchedEvent, Watcher, ZooKeeper};
const ZK_CONNECT_STR: &str = "127.0.0.1:2181";
const RULES_ZNODE_PATH: &str = "/waf/rules";
struct ZkWatcher {
engine: Arc<WafEngine>,
}
// 实现Watcher trait,处理Zookeeper事件
#[async_trait]
impl Watcher for ZkWatcher {
async fn handle(&self, event: WatchedEvent) {
info!("Received Zookeeper event: {:?}", event);
// 我们只关心节点数据变化事件
if event.event_type == zookeeper_async::WatchedEventType::NodeDataChanged {
if let Some(path) = event.path {
if path == RULES_ZNODE_PATH {
// 这里我们不能直接在回调里执行阻塞或长时间的操作
// 最好是发送一个信号或消息到另一个任务去处理。
// 为简化示例,我们直接在这里重新获取和更新。
// 在真实项目中,会把 engine clone() 传给一个新spawn的task。
if let Err(e) = self.fetch_and_update_rules().await {
error!("Failed to fetch and update rules after notification: {}", e);
}
}
}
}
}
}
impl ZkWatcher {
// 负责从Zookeeper获取最新规则并更新到WAF引擎
async fn fetch_and_update_rules(&self) -> Result<(), Box<dyn std::error::Error>> {
let zk = ZooKeeper::connect(ZK_CONNECT_STR, Duration::from_secs(15), self.clone()).await?;
// 重新获取数据,并再次设置watch
// watch是一次性的,每次触发后都需要重新设置
match zk.get_data_w(RULES_ZNODE_PATH, self.clone()).await {
Ok((bytes, _stat)) => {
info!("Fetched {} bytes of new rule data from Zookeeper.", bytes.len());
match RuleSet::from_slice(&bytes) {
Ok(rule_set) => {
self.engine.update_rules(rule_set);
info!("Successfully updated WAF rules.");
}
Err(e) => {
error!("Failed to deserialize rules from Zookeeper data: {}", e);
}
}
}
Err(e) => {
error!("Failed to get data from Zookeeper path '{}': {}", RULES_ZNODE_PATH, e);
}
}
Ok(())
}
}
pub async fn start_zookeeper_listener(engine: Arc<WafEngine>) {
loop {
info!("Attempting to connect to Zookeeper at {}", ZK_CONNECT_STR);
let watcher = ZkWatcher {
engine: engine.clone(),
};
match ZooKeeper::connect(ZK_CONNECT_STR, Duration::from_secs(15), watcher.clone()).await {
Ok(zk) => {
info!("Successfully connected to Zookeeper.");
// 确保父节点存在
if zk.exists("/waf", false).await.unwrap().is_none() {
zk.create("/waf", &[], Acl::open_unrestricted(), CreateMode::Persistent).await.unwrap();
}
// 首次启动时,尝试获取一次规则
// 同时设置一个watch,以便在规则变更时收到通知
match zk.get_data_w(RULES_ZNODE_PATH, watcher.clone()).await {
Ok((bytes, _stat)) => {
if let Ok(rule_set) = RuleSet::from_slice(&bytes) {
engine.update_rules(rule_set);
info!("Initial rules loaded successfully.");
}
}
Err(zookeeper_async::ZkError::NoNode) => {
warn!("Rules node '{}' does not exist yet. Waiting for creation.", RULES_ZNODE_PATH);
// 如果节点不存在,也需要设置一个存在的watch来监听创建事件
if zk.exists_w(RULES_ZNODE_PATH, watcher).await.is_err() {
error!("Failed to set exists watch on rules node.");
}
}
Err(e) => {
error!("Failed to perform initial fetch from Zookeeper: {}", e);
}
}
// 连接是会话制的,如果断开,上面的代码会返回错误,外部循环会处理重连
// 我们需要一个方式来保持这个函数存活,直到会话过期
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let _zk_clone = zk.clone();
tokio::spawn(async move {
// 这个任务在zk会话关闭时结束
_zk_clone.on_close().await;
let _ = tx.send(()).await;
});
rx.recv().await;
error!("Zookeeper session expired. Reconnecting...");
}
Err(e) => {
error!("Failed to connect to Zookeeper: {}. Retrying in 5 seconds.", e);
}
}
sleep(Duration::from_secs(5)).await;
}
}
// main.rs
#[tokio::main]
async fn main() {
// 设置日志
env_logger::init();
let engine = Arc::new(WafEngine::new());
// 在一个独立的task中运行Zookeeper监听器
let engine_clone = engine.clone();
tokio::spawn(async move {
coordinator::start_zookeeper_listener(engine_clone).await;
});
// 这里可以启动一个HTTP服务器来接收真实流量
// 为简化,我们只模拟请求来测试
info!("WAF Engine started. Waiting for rules from Zookeeper...");
// 模拟一个web服务器的循环,每隔几秒处理一个请求
loop {
sleep(Duration::from_secs(5)).await;
let mut headers = HashMap::new();
headers.insert("user-agent".to_string(), "Test-Client".to_string());
headers.insert("x-malicious-header".to_string(), "bad-content-123".to_string());
let mut query_params = HashMap::new();
query_params.insert("id".to_string(), "100".to_string());
query_params.insert("payload".to_string(), "some-sql-injection".to_string());
let req = MockRequest {
uri: "/test/path",
headers,
query_params,
body: "{\"username\": \"admin\"}"
};
info!("Processing a mock request...");
let action = engine.process_request(&req);
info!("Request processed. Action: {:?}", action);
}
}
这段代码建立了一个健壮的循环,用于连接Zookeeper。一旦连接成功,它会立即尝试获取一次规则,并设置一个watch。当连接断开时,循环会等待5秒后自动重连。Watcher的回调逻辑是核心:当收到数据变更通知后,它会重新获取数据,反序列化成 RuleSet
,然后调用 WafEngine
的 update_rules
方法。
整个系统的架构如下:
graph TD subgraph Control Plane A[Flutter Admin UI] -- 1. Operator edits rules --> B{Zookeeper Cluster}; end B -- 2. Writes rules to /waf/rules ZNode --> B; B -- 3. Notifies watchers --> C1[Rust WAF Node 1]; B -- 3. Notifies watchers --> C2[Rust WAF Node 2]; B -- 3. Notifies watchers --> C3[Rust WAF Node N]; subgraph Data Plane C1 -- 4. Fetches new rules --> B; C2 -- 4. Fetches new rules --> B; C3 -- 4. Fetches new rules --> B; C1 -- 5. Atomically updates in-memory rules --> C1; C2 -- 5. Atomically updates in-memory rules --> C2; C3 -- 5. Atomically updates in-memory rules --> C3; end subgraph Traffic Flow D[Incoming Traffic] --> E{Load Balancer}; E --> C1; E --> C2; E --> C3; end
第三步:Flutter管理端
Flutter部分相对直接。我们需要一个界面来管理规则列表,并将它们序列化成JSON推送到Zookeeper。在生产项目中,我们会使用一个原生的Zookeeper Dart库,但为了演示,我们可以通过调用 zkCli.sh
命令行工具来简化这个过程,这在内部工具中是一个可行的权衡。
// lib/rule_manager.dart (伪代码/核心逻辑)
import 'dart:convert';
import 'dart:io';
// Rule, Condition等数据结构需要和Rust侧对应
class Rule {
int id;
String description;
List<Condition> conditions;
String action; // "Block", "Log", "Allow"
// ... constructors and toJson method
}
class RuleSet {
int version;
List<Rule> rules;
Map<String, dynamic> toJson() => {
'version': version,
'rules': rules.map((r) => r.toJson()).toList(),
};
}
class ZookeeperService {
final String zkCliPath; // e.g., "/path/to/zookeeper/bin/zkCli.sh"
final String zkServers; // e.g., "127.0.0.1:2181"
final String rulesNodePath = "/waf/rules";
ZookeeperService(this.zkCliPath, this.zkServers);
Future<void> pushRules(RuleSet ruleSet) async {
// 每次推送都更新版本号
ruleSet.version = DateTime.now().millisecondsSinceEpoch;
String jsonData = jsonEncode(ruleSet.toJson());
// 在一个临时文件中写入数据,因为zkCli的set命令对数据长度有限制且处理多行字符串不便
final tempFile = File('temp_rules.json');
await tempFile.writeAsString(jsonData);
// 检查节点是否存在,不存在则创建
var checkResult = await Process.run(zkCliPath, ['-server', zkServers, 'ls', '/waf']);
if (!checkResult.stdout.toString().contains('rules')) {
await Process.run(zkCliPath, ['-server', zkServers, 'create', rulesNodePath, '""']);
}
// 使用 set 命令更新 ZNode 的数据
// 注意:在生产环境中,直接调用命令行工具存在安全和稳定性风险
// 应该使用原生库进行交互
var result = await Process.run(
zkCliPath,
['-server', zkServers, 'set', rulesNodePath, await tempFile.readAsString()],
);
await tempFile.delete();
if (result.exitCode != 0) {
throw Exception("Failed to push rules to Zookeeper: ${result.stderr}");
}
print("Successfully pushed rules to Zookeeper.");
}
}
// 在Flutter的UI部分,你会有一个状态管理对象持有 List<Rule>
// 当用户点击 "Save and Push" 按钮时,调用:
//
// final ruleSet = RuleSet(version: 0, rules: currentRules);
// await zookeeperService.pushRules(ruleSet);
这个Flutter应用就是一个简单的CRUD界面,核心功能是 pushRules
方法。它将当前的规则列表打包成一个 RuleSet
对象,序列化为JSON,然后通过 zkCli.sh
更新到Zookeeper的 /waf/rules
节点。
第四步:测试,测试,还是测试
一个分布式系统的稳定性取决于其最薄弱的环节。
Rust单元测试: 针对
WafEngine
的匹配逻辑编写详尽的单元测试是基础。```rust
// tests/engine_test.rs
#[cfg(test)]
mod tests {use super::*; // import from engine.rs #[test] fn test_block_on_malicious_header() { let engine = WafEngine::new(); let rule = Rule { id: 1, description: "Block malicious header".to_string(), conditions: vec