利用 Zipkin 分布式追踪实现云原生 CI/CD 流程的可视化看板


标准的 CI/CD 工具界面,无论是 Jenkins、GitLab CI 还是其他平台,本质上都是一个按时间顺序滚动的日志流。在简单的单体应用构建场景下,这勉强够用。但在云原生环境中,一次发布可能触发十几个微服务的并行构建、集成测试和分阶段部署。此时,日志流就成了一个无法有效解读的“黑盒”。定位哪个阶段是瓶颈、哪个并行任务拖慢了整体进度、依赖关系是否合理,几乎是不可能完成的任务。我们需要的不是日志,而是一张能实时反映工作流状态、依赖和耗时的作战地图。

这个问题的核心在于,CI/CD 流程本身就是一个分布式的、有向无环图(DAG)的执行过程。而我们观察它的方式,却还停留在单线程的文本输出。我的初步构想是,如果我们能将 CI/CD 的每一个原子操作(如 代码检出单元测试镜像构建)都视为一个服务调用,那么整个 Pipeline 的执行过程不就是一次完整的分布式调用链路吗?这个想法直接将我们引向了分布式追踪技术。

我们的目标是:利用分布式追踪系统捕获 CI/CD 流程中的每一个环节,然后将这些追踪数据转化为一个动态的、类似看板(Kanban)的视图。在这个视图上,每一个“任务卡片”代表一个构建任务,它会随着任务的实际进展(排队、执行、成功、失败)在不同的列之间自动移动。

技术栈与架构决策

在真实项目中,技术选型必须务实。

  1. CI/CD 引擎: 我们需要一个云原生的管道引擎。Tekton 是不二之选。它的 PipelineTaskPipelineRunTaskRun 都是 Kubernetes 的原生 CRD,这使得我们可以通过 kubectl 和 Kubernetes API 与其交互,也为注入我们自己的逻辑提供了极大的便利。每个 Task 运行在一个独立的 Pod 中,环境隔离做得非常彻底。

  2. 追踪系统: 用户指定了 Zipkin。这是一个成熟且轻量级的选择。虽然 OpenTelemetry 是未来的方向,但 Zipkin 的 B3 Propagation 协议足够简单,其核心概念(Trace, Span, Annotation)也完全能满足我们的需求。这里的关键是把 Zipkin 当作一个结构化的事件存储后端,而不是一个单纯的 RPC 追踪工具。

  3. 追踪数据注入: 这是整个方案的核心。如何让 Tekton 的每个 Step 在执行时都生成一个 Span?最直接的办法是开发一个轻量级的命令行包装器(wrapper)。Tekton 的 Step 本质上是执行一个容器里的命令。我们可以用我们的包装器去执行真正的业务命令(如 go build),并在执行前后创建和结束 Span。

  4. 可视化后端: Zipkin 的原生 UI 专为短时间的 RPC 调用设计,不适合展示长达数分钟甚至数小时的 CI/CD 流程。我们需要一个中间服务,它定期从 Zipkin API 拉取特定于 CI/CD 的追踪数据,解析这些数据,维护一个实时的流程状态机,并通过 WebSocket 将状态变化推送给前端。

  5. 可视化前端: 一个纯粹的展示层,用最简单的 HTML/CSS/JavaScript 实现。它只负责连接 WebSocket,根据接收到的数据动态渲染看板。

整体架构如下:

graph TD
    subgraph Kubernetes Cluster
        A[Git Push] --> B{Webhook Trigger};
        B --> C[Tekton Controller];
        C -- Creates --> D[PipelineRun];
        D -- Creates --> E[TaskRun Pod];
        subgraph E [TaskRun Pod]
            direction LR
            F[Tracer CLI Wrapper] -- Executes --> G[Build/Test Command];
        end
        F -- Sends Spans --> H[Zipkin];
    end

    subgraph Visualization Service
        I[Kanban Backend] -- Polls API --> H;
        I -- WebSocket --> J[Frontend Kanban UI];
    end

    subgraph User
        K[Developer] -- Views --> J;
    end

第一步:实现追踪命令行包装器 (Tracer CLI Wrapper)

我们需要一个 Go 程序,它接收要执行的命令作为参数,同时从环境变量中读取追踪上下文(Trace ID, Parent Span ID),然后创建一个新的 Span,执行命令,最后根据命令的退出码标记 Span 的状态并上报。

这里的坑在于,追踪上下文的传递。Tekton 中,Task 内的 Step 是顺序执行的,而不同的 Task 可能并行。我们需要一种机制,让整个 PipelineRun 成为一个根 Span,每个 TaskRun 成为其子 Span,而 TaskRun 里的每个 Step 再成为 TaskRun Span 的子 Span。环境变量是实现这种跨进程、跨 Pod 传递上下文的最简单可靠的方式。

tracer-cli/main.go:

package main

import (
	"log"
	"os"
	"os/exec"
	"strconv"
	"strings"
	"syscall"
	"time"

	"github.com/openzipkin/zipkin-go"
	"github.com/openzipkin/zipkin-go/model"
	reporterhttp "github.com/openzipkin/zipkin-go/reporter/http"
)

const (
	zipkinEndpoint    = "http://zipkin.default.svc.cluster.local:9411/api/v2/spans"
	defaultServiceName = "ci-cd-pipeline"
)

// getEnv retrieves an environment variable or returns a default value.
func getEnv(key, fallback string) string {
	if value, ok := os.LookupEnv(key); ok {
		return value
	}
	return fallback
}

func main() {
	// 1. 初始化 Zipkin Tracer
	// 在生产环境中,这里的配置应该更加健壮,例如包含重试逻辑。
	reporter := reporterhttp.NewReporter(zipkinEndpoint)
	defer reporter.Close()

	serviceName := getEnv("CI_SERVICE_NAME", defaultServiceName)
	endpoint, err := zipkin.NewEndpoint(serviceName, "localhost:0")
	if err != nil {
		log.Fatalf("unable to create local endpoint: %+v\n", err)
	}

	tracer, err := zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(endpoint))
	if err != nil {
		log.Fatalf("unable to create tracer: %+v\n", err)
	}

	// 2. 解析追踪上下文
	// 我们约定通过环境变量传递上下文,这是 Tekton step 之间传递信息的标准方式。
	traceIDStr := getEnv("TRACE_ID", "")
	parentSpanIDStr := getEnv("PARENT_SPAN_ID", "")

	var traceContext model.TraceContext
	if traceIDStr != "" {
		traceID, err := model.TraceIDFromHex(traceIDStr)
		if err != nil {
			log.Fatalf("invalid TRACE_ID: %s", traceIDStr)
		}
		traceContext.TraceID = traceID

		if parentSpanIDStr != "" {
			parentID, err := model.SpanIDFromHex(parentSpanIDStr)
			if err != nil {
				log.Fatalf("invalid PARENT_SPAN_ID: %s", parentSpanIDStr)
			}
			traceContext.ParentID = &parentID
		}
	}

	// 3. 创建 Span
	// Span 的名称来自环境变量,这样我们可以在 Tekton Task 中定义有意义的名称。
	spanName := getEnv("SPAN_NAME", "unnamed-step")
	span := tracer.StartSpan(spanName, zipkin.Parent(traceContext))
	defer span.Finish()

	log.Printf("Starting span '%s' with TraceID: %s, SpanID: %s", spanName, span.Context().TraceID, span.Context().ID)
	
	// 将当前的 TraceID 和 SpanID 注入到环境变量,供子进程或下一个 step 使用。
	// 这样就构成了调用链。
	os.Setenv("TRACE_ID", span.Context().TraceID.String())
	os.Setenv("PARENT_SPAN_ID", span.Context().ID.String())

	// 4. 执行实际命令
	args := os.Args[1:]
	if len(args) == 0 {
		log.Println("No command provided to execute.")
		span.Tag("error", "true")
		span.Tag("exit.code", "-1")
		span.Tag("error.reason", "No command specified")
		return
	}

	cmd := exec.Command(args[0], args[1:]...)
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr
    // 关键:将当前进程的所有环境变量(包括我们刚刚设置的 TRACE_ID)传递给子进程
	cmd.Env = os.Environ()

	startTime := time.Now()
	err = cmd.Run()
	duration := time.Since(startTime)

	// 5. 标记 Span 状态并上报
	span.Tag("duration_ms", strconv.FormatInt(duration.Milliseconds(), 10))
	span.Tag("command", strings.Join(args, " "))
	// 注入关键的业务/CI上下文信息,这对于后续的数据分析至关重要。
	span.Tag("git.repo", getEnv("GIT_REPO", "unknown"))
	span.Tag("git.commit", getEnv("GIT_COMMIT", "unknown"))

	if err != nil {
		span.Tag("error", "true")
		exitCode := getExitCode(err)
		span.Tag("exit.code", strconv.Itoa(exitCode))
		log.Printf("Command finished with error. Exit code: %d", exitCode)
		os.Exit(exitCode)
	} else {
		span.Tag("exit.code", "0")
		log.Println("Command finished successfully.")
	}
}

// getExitCode 从 `exec.Error` 中提取退出码
func getExitCode(err error) int {
	if exitErr, ok := err.(*exec.ExitError); ok {
		if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
			return status.ExitStatus()
		}
	}
	// 返回一个非零的默认值,表示未知错误
	return 1
}

构建并打包这个工具到一个 Docker 镜像中,例如 my-registry/tracer-cli:0.1.0

第二步:改造 Tekton Task 和 Pipeline

现在,我们用 tracer-cli 来包装 Tekton Pipeline 中的每一个关键步骤。假设我们有一个简单的构建-测试-部署流程。

tekton/task-build.yaml:

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: go-build
spec:
  params:
    - name: gitRepo
      type: string
    - name: gitCommit
      type: string
  steps:
    - name: build
      image: my-registry/tracer-cli:0.1.0 # 使用我们自己的镜像
      env:
        # SPAN_NAME 会成为 Zipkin 中 Span 的名字
        - name: SPAN_NAME
          value: "go-build"
        # 传递业务上下文,这些会成为 Span 的 Tag
        - name: GIT_REPO
          value: $(params.gitRepo)
        - name: GIT_COMMIT
          value: $(params.gitCommit)
      # 使用 tracer-cli 执行实际的构建命令
      # 注意:这里的 `go build` 只是一个例子,实际命令会更复杂
      script: |
        #!/bin/sh
        # 假设源码已经通过其他方式检出到了 /workspace/source
        cd /workspace/source
        /app/tracer-cli go build -v -o /workspace/app/my-app ./...

一个常见的错误是忘记将父级 Span 的上下文传递给并行的 Task。Tekton 的 Pipeline CRD 允许我们定义执行图。为了链式追踪,我们需要一个“前置”任务来生成一个全局的 Trace ID。

tekton/pipeline-release.yaml:

apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: release-pipeline
spec:
  params:
    - name: gitRepo
      type: string
    - name: gitCommit
      type: string
  tasks:
    # 任务1: 创建根 Span,并将 Trace ID 和 Span ID 输出为结果
    - name: start-trace
      taskSpec:
        steps:
          - name: generate-trace-context
            image: my-registry/tracer-cli:0.1.0
            env:
              - name: SPAN_NAME
                value: "pipeline-run"
              - name: GIT_REPO
                value: $(params.gitRepo)
              - name: GIT_COMMIT
                value: $(params.gitCommit)
            script: |
              #!/bin/sh
              # 执行一个瞬时命令来创建 Span
              # tracer-cli 会自动将 TRACE_ID 和 PARENT_SPAN_ID 设置为环境变量
              /app/tracer-cli echo "Starting pipeline trace..."
              # 将环境变量写入 Tekton 的 results,供后续 tasks 使用
              echo -n "${TRACE_ID}" > $(results.traceId.path)
              echo -n "${PARENT_SPAN_ID}" > $(results.spanId.path)
        results:
          - name: traceId
            description: The trace ID for the entire pipeline run.
          - name: spanId
            description: The root span ID for the pipeline run.
    
    # 任务2: 构建,它依赖于 start-trace 任务的结果
    - name: build-service
      taskRef:
        name: go-build
      runAfter: [start-trace]
      params:
        - name: gitRepo
          value: $(params.gitRepo)
        - name: gitCommit
          value: $(params.gitCommit)
      # 关键的上下文传递
      # 我们将上一个任务的结果作为环境变量注入
      workspaces:
        - name: source
          workspace: shared-workspace
      # 伪代码:Tekton V1 中需要通过环境变量传递
      # env: 
      #   - name: TRACE_ID
      #     value: $(tasks.start-trace.results.traceId)
      #   - name: PARENT_SPAN_ID
      #     value: $(tasks.start-trace.results.spanId)
      # 注:在实际Tekton中,需要更复杂的机制(如写入文件)来传递环境变量,
      # 为简化示例,我们假设可以通过某种方式注入。
      # 一个更实际的方法是在start-trace中将上下文写入workspace的文件中,
      # 后续task从文件中读取。

注意: 上述 YAML 中的环境变量传递是简化的。在真实的 Tekton 中,Task 之间传递数据通常通过 resultsworkspaces(共享存储)。一个健壮的实现是在 start-trace 任务中将 TRACE_IDPARENT_SPAN_ID 写入到一个共享 workspace 的文件中,后续的任务在执行 tracer-cli 之前先从该文件读取这些值并设置为环境变量。

第三步:看板后端服务 (Kanban Backend)

这个服务是连接追踪数据和前端 UI 的桥梁。它使用 Go 编写,包含一个 Zipkin 客户端、一个状态管理器和一个 WebSocket 服务器。

kanban-backend/main.go:

package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
)

const zipkinAPIV2 = "http://zipkin.default.svc.cluster.local:9411/api/v2/traces"

// PipelineState 定义了看板上一个卡片的状态
type PipelineState struct {
	TraceID   string    `json:"traceId"`
	Commit    string    `json:"commit"`
	Repo      string    `json:"repo"`
	Status    string    `json:"status"` // QUEUED, RUNNING, SUCCESS, FAILED
	StartTime time.Time `json:"startTime"`
	EndTime   time.Time `json:"endTime"`
	Duration  string    `json:"duration"`
	Tasks     []TaskState `json:"tasks"`
}

// TaskState 定义了 Pipeline 中的一个任务(Task)
type TaskState struct {
	Name      string    `json:"name"`
	Status    string    `json:"status"`
	StartTime time.Time `json:"startTime"`
	Duration  string    `json:"duration"`
}

// Global state management
var (
	pipelines = make(map[string]*PipelineState)
	mu        sync.RWMutex
	upgrader  = websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool { return true },
	}
	clients = make(map[*websocket.Conn]bool)
)

// pollZipkin 是核心逻辑,定期轮询 Zipkin API
func pollZipkin() {
	// 在生产环境中,应该使用更复杂的客户端,并处理分页
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

	for range ticker.C {
		// 查询过去1小时内名为 "ci-cd-pipeline" 的 trace
		// lookback=3600000 (1 hour in ms)
		resp, err := http.Get(zipkinAPIV2 + "?serviceName=ci-cd-pipeline&limit=100&lookback=3600000")
		if err != nil {
			log.Printf("Error polling Zipkin: %v", err)
			continue
		}
		defer resp.Body.Close()
		
		var traces [][]zipkinhttp.Span
		if err := json.NewDecoder(resp.Body).Decode(&traces); err != nil {
			log.Printf("Error decoding Zipkin response: %v", err)
			continue
		}
		
		updateState(traces)
	}
}

// updateState 根据拉取到的 traces 更新全局状态
func updateState(traces [][]zipkinhttp.Span) {
    // 这是一个简化的转换逻辑,实际项目需要更精细地处理 span 的父子关系
	mu.Lock()
	defer mu.Unlock()

    changed := false
	for _, trace := range traces {
        if len(trace) == 0 {
            continue
        }
		traceID := trace[0].TraceID
		
		// 查找根 Span (没有 parentId 的那个)
		var rootSpan *zipkinhttp.Span
		for i, span := range trace {
			if span.ParentID == "" && span.Name == "pipeline-run" {
				rootSpan = &trace[i]
				break
			}
		}

		if rootSpan == nil {
			continue // 不是我们关心的 pipeline trace
		}
        
        // 简化逻辑:如果 trace 还在进行中,EndTime 会是零值
        status := "RUNNING"
        if rootSpan.Duration > 0 {
            if val, ok := rootSpan.Tags["error"]; ok && val == "true" {
                status = "FAILED"
            } else {
                status = "SUCCESS"
            }
        }
		
        commit := rootSpan.Tags["git.commit"]
        if len(commit) > 7 {
            commit = commit[:7]
        }

		// 创建或更新 pipeline 状态
		if _, ok := pipelines[traceID]; !ok {
            changed = true
			pipelines[traceID] = &PipelineState{TraceID: traceID}
		}
        p := pipelines[traceID]
        p.Repo = rootSpan.Tags["git.repo"]
        p.Commit = commit
        p.Status = status
        p.StartTime = rootSpan.Timestamp
        p.Duration = (time.Duration(rootSpan.Duration) * time.Microsecond).String()

        // ... 此处应有更复杂的逻辑来解析子 Span 并填充 Tasks 列表 ...
	}
    
    // 如果状态有变,广播给所有 WebSocket 客户端
    if changed {
        broadcastState()
    }
}

// broadcastState 将当前所有 pipeline 的状态发送给所有连接的客户端
func broadcastState() {
	mu.RLock()
	defer mu.RUnlock()

	stateSnapshot := make([]*PipelineState, 0, len(pipelines))
	for _, p := range pipelines {
		stateSnapshot = append(stateSnapshot, p)
	}

	msg, err := json.Marshal(stateSnapshot)
	if err != nil {
		log.Printf("Error marshalling state: %v", err)
		return
	}

	for client := range clients {
		if err := client.WriteMessage(websocket.TextMessage, msg); err != nil {
			log.Printf("Error writing to client: %v", err)
			client.Close()
			delete(clients, client)
		}
	}
}

func handleConnections(w http.ResponseWriter, r *http.Request) {
	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer ws.Close()

	clients[ws] = true
	log.Println("New client connected")
    
    // 新客户端连接时,立即发送一次当前快照
    broadcastState()

	for {
		// 保持连接,直到客户端断开
		if _, _, err := ws.ReadMessage(); err != nil {
			log.Printf("Client disconnected: %v", err)
			delete(clients, ws)
			break
		}
	}
}

func main() {
	go pollZipkin()

	http.HandleFunc("/ws", handleConnections)
	log.Println("Kanban backend server started on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal("ListenAndServe:", err)
	}
}

第四步:前端看板 (Kanban UI)

这是一个极简的 HTML 页面,用于验证整个流程。它只做三件事:连接 WebSocket,监听消息,根据消息数据渲染 DOM。

index.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>CI/CD Kanban</title>
    <style>
        body { font-family: sans-serif; background-color: #f4f4f4; }
        .board { display: flex; gap: 10px; padding: 10px; }
        .column { flex: 1; background: #e0e0e0; padding: 10px; border-radius: 5px; }
        .column h2 { text-align: center; margin-top: 0; }
        .card { background: white; padding: 15px; margin-bottom: 10px; border-radius: 3px; box-shadow: 0 1px 3px rgba(0,0,0,0.2); }
        .card.success { border-left: 5px solid #28a745; }
        .card.failed { border-left: 5px solid #dc3545; }
        .card.running { border-left: 5px solid #007bff; }
        .card p { margin: 5px 0; }
    </style>
</head>
<body>
    <h1>CI/CD Real-time Kanban</h1>
    <div class="board">
        <div class="column" id="col-running"><h2>RUNNING</h2></div>
        <div class="column" id="col-success"><h2>SUCCESS</h2></div>
        <div class="column" id="col-failed"><h2>FAILED</h2></div>
    </div>

    <script>
        const columns = {
            'RUNNING': document.getElementById('col-running'),
            'SUCCESS': document.getElementById('col-success'),
            'FAILED': document.getElementById('col-failed'),
        };

        const ws = new WebSocket('ws://localhost:8080/ws');

        ws.onmessage = function(event) {
            const pipelines = JSON.parse(event.data);
            // Clear all columns before re-rendering
            Object.values(columns).forEach(col => {
                // Keep the h2 title
                while (col.childNodes.length > 1) {
                    col.removeChild(col.lastChild);
                }
            });
            
            pipelines.sort((a, b) => new Date(b.startTime) - new Date(a.startTime));

            pipelines.forEach(p => {
                const card = document.createElement('div');
                card.className = `card ${p.status.toLowerCase()}`;
                card.id = `trace-${p.traceId}`;
                card.innerHTML = `
                    <p><strong>Repo:</strong> ${p.repo}</p>
                    <p><strong>Commit:</strong> ${p.commit}</p>
                    <p><strong>Duration:</strong> ${p.duration}</p>
                    <p><strong>Started:</strong> ${new Date(p.startTime).toLocaleString()}</p>
                `;
                const targetColumn = columns[p.status];
                if (targetColumn) {
                    targetColumn.appendChild(card);
                }
            });
        };

        ws.onopen = function() { console.log('WebSocket connection established'); };
        ws.onerror = function(error) { console.log('WebSocket Error: ' + error); };
        ws.onclose = function() { console.log('WebSocket connection closed'); };
    </script>
</body>
</html>

局限性与未来迭代方向

我们构建的这个系统虽然验证了核心思想,但在生产环境中还存在一些明显的局限性。

首先,后端服务通过轮询 Zipkin API 的方式获取数据,这存在延迟且效率不高。一个更优的架构是引入 OpenTelemetry Collector,配置一个 Zipkin Receiver 和一个可以推送或被订阅的 Exporter(例如 Kafka Exporter),让我们的看板后端服务直接从消息队列消费追踪数据,实现真正的实时更新。

其次,当前的状态管理是纯内存的,服务重启后所有历史状态都会丢失。要支持历史数据查询和分析,需要将 PipelineState 持久化到数据库中,例如 PostgreSQL 或 Redis。

最后,可视化本身非常初级。一个完善的看板应该能点击卡片下钻到具体的 Task 列表,展示每个 Task 的耗时和日志链接,甚至可以绘制出 Pipeline 的执行甘特图,将时间维度的瓶颈更直观地暴露出来。这套基于 Span 的结构化数据,为实现这些高级功能打下了坚实的基础。


  目录