在分布式系统和微服务架构中,日志是排查问题、审计行为、监控状态的重要来源。本篇作为《Go语言100个实战案例》中的一篇,带你从设计到实现,完整写出一个轻量级、高可用的日志收集脚本(Agent),能够实时采集多个本地日志文件、处理文件切割(rotation)、按批发送到远端聚合服务,并具备重试、限流和优雅停止能力。


目标与场景

目标:实现一个“可部署到每台机器上”的日志采集脚本(agent),功能包括:

  • • 监控并 tail 多个指定日志文件(支持通配符)
  • • 处理日志切割(rotation)场景(无需丢失数据)
  • • 将日志按批次、JSON 格式发送到远端 HTTP 接收端(可替换为 Kafka/gRPC)
  • • 支持并发、限流、指数退避重试和本地缓冲
  • • 可优雅停止并保证数据尽可能送达

适用场景:小型到中型集群的轻量采集、调试环境、或作为自研日志管道的一部分。


技术选型(简要)

  • • 语言:Go(并发模型天然适合)
  • • 文件 tail:github.com/hpcloud/tail(成熟、支持 rotation)——也可用 fsnotify + 自实现 tail,但 hpcloud/tail 工具成熟、代码量少
  • • 网络传输:HTTP POST + gzip + JSON(易于接入)
  • • 配置:命令行 flags + 简单 JSON/YAML(本文用 flags)
  • • 重试策略:指数退避(带上限)

注:示例使用 hpcloud/tail 来可靠处理文件 truncation/rotation,实际生产可替换为更复杂的 offset 存储(保证断点续传)


项目结构(示意)

log-agent/
├─ main.go
├─ sender.go
├─ tailer.go
├─ go.mod

下面直接给出一个 单文件(main.go) 的可运行示例,方便快速理解与使用。


完整代码(main.go)

// main.go
package main

import (
    "bufio"
    "bytes"
    "compress/gzip"
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "io"
    "net/http"
    "os"
    "os/signal"
    "path/filepath"
    "sync"
    "syscall"
    "time"

    "github.com/hpcloud/tail"
)

// LogRecord 定义发送到服务器的 JSON 结构
type LogRecord struct {
    Timestamp time.Time `json:"timestamp"`
    Host      string    `json:"host"`
    Path      string    `json:"path"`
    Line      string    `json:"line"`
}

// Config
var (
    globPattern = flag.String("paths", "/var/log/*.log", "日志文件路径,支持通配符")
    endpoint    = flag.String("endpoint", "http://127.0.0.1:8080/ingest", "日志收集服务地址")
    batchSize   = flag.Int("batch", 200, "每次发送最大条数")
    batchWait   = flag.Duration("wait", 2*time.Second, "批量发送最大等待时间")
    workers     = flag.Int("workers", 4, "并发发送 worker 数")
    maxQueue    = flag.Int("queue", 2000, "本地队列最大条数,超出丢弃最老")
)

func main() {
    flag.Parse()

    host, _ := os.Hostname()

    paths, err := filepath.Glob(*globPattern)
    if err != nil {
        fmt.Fprintf(os.Stderr, "invalid pattern: %v\n", err)
        os.Exit(1)
    }
    if len(paths) == 0 {
        fmt.Fprintf(os.Stderr, "no logs matched pattern: %s\n", *globPattern)
        os.Exit(1)
    }

    // context for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // signal handling
    sigc := make(chan os.Signal, 1)
    signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigc
        fmt.Println("received shutdown signal, stopping...")
        cancel()
    }()

    // central channel for lines
    lineCh := make(chan LogRecord, *maxQueue)

    var wg sync.WaitGroup

    // start tailers
    for _, p := range paths {
        wg.Add(1)
        go func(path string) {
            defer wg.Done()
            if err := tailFile(ctx, path, host, lineCh); err != nil {
                fmt.Fprintf(os.Stderr, "tail %s error: %v\n", path, err)
            }
        }(p)
    }

    // start sender workers
    senderWg := &sync.WaitGroup{}
    for i := 0; i < *workers; i++ {
        senderWg.Add(1)
        go func(id int) {
            defer senderWg.Done()
            runSender(ctx, id, lineCh, *endpoint, *batchSize, *batchWait)
        }(i)
    }

    // wait for tailers to finish (on ctx cancel they will exit)
    wg.Wait()
    // close channel to signal senders to flush and exit
    close(lineCh)
    // wait for senders to finish
    senderWg.Wait()

    fmt.Println("agent stopped")
}

// tailFile 使用 hpcloud/tail 跟踪文件
func tailFile(ctx context.Context, path, host string, out chan<- LogRecord) error {
    cfg := tail.Config{
        Follow:    true,
        ReOpen:    true, // 支持日志切割后重新打开
        MustExist: false,
        Poll:      true,
        Logger:    tail.DiscardingLogger,
    }
    t, err := tail.TailFile(path, cfg)
    if err != nil {
        return err
    }
    defer t.Cleanup()

    for {
        select {
        case <-ctx.Done():
            t.Cleanup()
            return nil
        case line, ok := <-t.Lines:
            if !ok {
                // channel closed; end
                return nil
            }
            if line == nil {
                continue
            }
            rec := LogRecord{
                Timestamp: time.Now().UTC(),
                Host:      host,
                Path:      path,
                Line:      line.Text,
            }
            // non-blocking send to avoid blocking tail; drop oldest if full
            select {
            case out <- rec:
            default:
                // drop one and push new (simple policy)
                select {
                case <-out:
                default:
                }
                select {
                case out <- rec:
                default:
                    // give up if still full
                }
            }
        }
    }
}

// runSender 聚合并发送日志,带简单重试
func runSender(ctx context.Context, id int, in <-chan LogRecord, endpoint string, batchSize int, batchWait time.Duration) {
    httpClient := &http.Client{
        Timeout: 10 * time.Second,
    }
    buf := make([]LogRecord, 0, batchSize)

    sendBatch := func(batch []LogRecord) error {
        if len(batch) == 0 {
            return nil
        }
        // marshal
        data, err := json.Marshal(batch)
        if err != nil {
            return err
        }
        // gzip body
        var b bytes.Buffer
        gw := gzip.NewWriter(&b)
        if _, err := gw.Write(data); err != nil {
            _ = gw.Close()
            return err
        }
        _ = gw.Close()

        req, _ := http.NewRequest("POST", endpoint, &b)
        req.Header.Set("Content-Encoding", "gzip")
        req.Header.Set("Content-Type", "application/json")
        // retry with exponential backoff
        var attempt int
        for {
            attempt++
            resp, err := httpClient.Do(req)
            if err == nil {
                io.Copy(io.Discard, resp.Body)
                resp.Body.Close()
                if resp.StatusCode >= 200 && resp.StatusCode < 300 {
                    return nil
                }
                err = fmt.Errorf("bad status: %s", resp.Status)
            }
            // on ctx done, abort immediately
            select {
            case <-ctx.Done():
                return fmt.Errorf("context canceled")
            default:
            }
            if attempt >= 5 {
                return err
            }
            // backoff
            sleep := time.Duration(500*(1<<uint(attempt-1))) * time.Millisecond
            if sleep > 10*time.Second {
                sleep = 10 * time.Second
            }
            time.Sleep(sleep)
        }
    }

    timer := time.NewTimer(batchWait)
    defer timer.Stop()

    for {
        select {
        case <-ctx.Done():
            // flush remaining
            _ = sendBatch(buf)
            return
        case rec, ok := <-in:
            if !ok {
                // channel closed -> flush and exit
                _ = sendBatch(buf)
                return
            }
            buf = append(buf, rec)
            if len(buf) >= batchSize {
                _ = sendBatch(buf)
                buf = buf[:0]
                if !timer.Stop() {
                    select {
                    case <-timer.C:
                    default:
                    }
                }
                timer.Reset(batchWait)
            }
        case <-timer.C:
            if len(buf) > 0 {
                _ = sendBatch(buf)
                buf = buf[:0]
            }
            timer.Reset(batchWait)
        }
    }
}

使用方法

  1. 1. 初始化模块并获取依赖:
go mod init example.com/log-agent
go get github.com/hpcloud/tail
go build -o log-agent main.go
  1. 2. 运行(示例):
./log-agent -paths "/var/log/myapp/*.log" -endpoint "http://log-collector:8080/ingest" -batch 100 -workers 4
  1. 3. 建议把 agent 用 systemd 管理或容器化部署为 DaemonSet(K8s)或 sidecar。

实践要点与注意事项

  • • 日志切割:使用 ReOpen: true 可处理 logrotate 产生的新文件句柄;生产环境建议结合 inode 校验与持久化 offset(例如把 offset 存到本地文件或 SQLite)以支持重启断点续传。
  • • 传输安全:生产环境使用 HTTPS + 鉴权(API Key / mTLS)来防止日志被窃取或篡改。
  • • 后端吞吐:发送端需要限流与批次控制,避免短时间内把流量拉爆目标端。也可以使用本地磁盘队列(如 diskqueue)在网络中断时持久化缓存。
  • • 结构化日志:尽量让应用输出结构化 JSON 日志,这样聚合与查询更强。若是 plain text,可在 agent 处做简单解析(regex)或转发原始行。
  • • 监控与自检:给 agent 加入心跳/metrics(Prometheus)接口,监控发送失败数、队列长度等关键指标。
  • • 日志隐私:注意日志中可能包含敏感数据(PII、密码、token),可在 agent 端进行脱敏或过滤再上报。

进一步改进(思路)

  • • 使用持久化队列(disk-backed)保证断网或进程崩溃后不丢日志。
  • • 支持多种传输后端:Kafka、gRPC、AWS S3、Elasticsearch 等。
  • • 支持日志标签(service、env、pod)自动注入(从系统 / 环境变量获取)。
  • • 增加插件化解析器(nginx、app custom parser)做字段抽取。
  • • 通过 Web UI 或配置中心动态下发采集规则。

总结

这篇文章展示了如何用 Go 快速实现一个可靠、可扩展的日志收集脚本:从文件采集、切割处理,到批量发送与重试策略,都给出了实际可运行的示例代码。实现中充分利用了 Go 的并发、channel 与 context,代码简洁、易扩展。把这个 agent 打包部署在每台节点上,就能为后端日志聚合系统提供稳定可靠的数据源。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐