Go语言实战案例 — 工具开发篇:编写高可用日志收集脚本
这篇文章展示了如何用 Go 快速实现一个可靠、可扩展的日志收集脚本:从文件采集、切割处理,到批量发送与重试策略,都给出了实际可运行的示例代码。实现中充分利用了 Go 的并发、channel 与 context,代码简洁、易扩展。把这个 agent 打包部署在每台节点上,就能为后端日志聚合系统提供稳定可靠的数据源。
·
在分布式系统和微服务架构中,日志是排查问题、审计行为、监控状态的重要来源。本篇作为《Go语言100个实战案例》中的一篇,带你从设计到实现,完整写出一个轻量级、高可用的日志收集脚本(Agent),能够实时采集多个本地日志文件、处理文件切割(rotation)、按批发送到远端聚合服务,并具备重试、限流和优雅停止能力。
目标与场景
目标:实现一个“可部署到每台机器上”的日志采集脚本(agent),功能包括:
- • 监控并 tail 多个指定日志文件(支持通配符)
- • 处理日志切割(rotation)场景(无需丢失数据)
- • 将日志按批次、JSON 格式发送到远端 HTTP 接收端(可替换为 Kafka/gRPC)
- • 支持并发、限流、指数退避重试和本地缓冲
- • 可优雅停止并保证数据尽可能送达
适用场景:小型到中型集群的轻量采集、调试环境、或作为自研日志管道的一部分。
技术选型(简要)
- • 语言:Go(并发模型天然适合)
- • 文件 tail:
github.com/hpcloud/tail
(成熟、支持 rotation)——也可用fsnotify
+ 自实现 tail,但 hpcloud/tail 工具成熟、代码量少 - • 网络传输:HTTP POST + gzip + JSON(易于接入)
- • 配置:命令行 flags + 简单 JSON/YAML(本文用 flags)
- • 重试策略:指数退避(带上限)
注:示例使用
hpcloud/tail
来可靠处理文件 truncation/rotation,实际生产可替换为更复杂的 offset 存储(保证断点续传)
项目结构(示意)
log-agent/
├─ main.go
├─ sender.go
├─ tailer.go
├─ go.mod
下面直接给出一个 单文件(main.go) 的可运行示例,方便快速理解与使用。
完整代码(main.go)
// main.go
package main
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/hpcloud/tail"
)
// LogRecord 定义发送到服务器的 JSON 结构
type LogRecord struct {
Timestamp time.Time `json:"timestamp"`
Host string `json:"host"`
Path string `json:"path"`
Line string `json:"line"`
}
// Config
var (
globPattern = flag.String("paths", "/var/log/*.log", "日志文件路径,支持通配符")
endpoint = flag.String("endpoint", "http://127.0.0.1:8080/ingest", "日志收集服务地址")
batchSize = flag.Int("batch", 200, "每次发送最大条数")
batchWait = flag.Duration("wait", 2*time.Second, "批量发送最大等待时间")
workers = flag.Int("workers", 4, "并发发送 worker 数")
maxQueue = flag.Int("queue", 2000, "本地队列最大条数,超出丢弃最老")
)
func main() {
flag.Parse()
host, _ := os.Hostname()
paths, err := filepath.Glob(*globPattern)
if err != nil {
fmt.Fprintf(os.Stderr, "invalid pattern: %v\n", err)
os.Exit(1)
}
if len(paths) == 0 {
fmt.Fprintf(os.Stderr, "no logs matched pattern: %s\n", *globPattern)
os.Exit(1)
}
// context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// signal handling
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigc
fmt.Println("received shutdown signal, stopping...")
cancel()
}()
// central channel for lines
lineCh := make(chan LogRecord, *maxQueue)
var wg sync.WaitGroup
// start tailers
for _, p := range paths {
wg.Add(1)
go func(path string) {
defer wg.Done()
if err := tailFile(ctx, path, host, lineCh); err != nil {
fmt.Fprintf(os.Stderr, "tail %s error: %v\n", path, err)
}
}(p)
}
// start sender workers
senderWg := &sync.WaitGroup{}
for i := 0; i < *workers; i++ {
senderWg.Add(1)
go func(id int) {
defer senderWg.Done()
runSender(ctx, id, lineCh, *endpoint, *batchSize, *batchWait)
}(i)
}
// wait for tailers to finish (on ctx cancel they will exit)
wg.Wait()
// close channel to signal senders to flush and exit
close(lineCh)
// wait for senders to finish
senderWg.Wait()
fmt.Println("agent stopped")
}
// tailFile 使用 hpcloud/tail 跟踪文件
func tailFile(ctx context.Context, path, host string, out chan<- LogRecord) error {
cfg := tail.Config{
Follow: true,
ReOpen: true, // 支持日志切割后重新打开
MustExist: false,
Poll: true,
Logger: tail.DiscardingLogger,
}
t, err := tail.TailFile(path, cfg)
if err != nil {
return err
}
defer t.Cleanup()
for {
select {
case <-ctx.Done():
t.Cleanup()
return nil
case line, ok := <-t.Lines:
if !ok {
// channel closed; end
return nil
}
if line == nil {
continue
}
rec := LogRecord{
Timestamp: time.Now().UTC(),
Host: host,
Path: path,
Line: line.Text,
}
// non-blocking send to avoid blocking tail; drop oldest if full
select {
case out <- rec:
default:
// drop one and push new (simple policy)
select {
case <-out:
default:
}
select {
case out <- rec:
default:
// give up if still full
}
}
}
}
}
// runSender 聚合并发送日志,带简单重试
func runSender(ctx context.Context, id int, in <-chan LogRecord, endpoint string, batchSize int, batchWait time.Duration) {
httpClient := &http.Client{
Timeout: 10 * time.Second,
}
buf := make([]LogRecord, 0, batchSize)
sendBatch := func(batch []LogRecord) error {
if len(batch) == 0 {
return nil
}
// marshal
data, err := json.Marshal(batch)
if err != nil {
return err
}
// gzip body
var b bytes.Buffer
gw := gzip.NewWriter(&b)
if _, err := gw.Write(data); err != nil {
_ = gw.Close()
return err
}
_ = gw.Close()
req, _ := http.NewRequest("POST", endpoint, &b)
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Content-Type", "application/json")
// retry with exponential backoff
var attempt int
for {
attempt++
resp, err := httpClient.Do(req)
if err == nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
err = fmt.Errorf("bad status: %s", resp.Status)
}
// on ctx done, abort immediately
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
default:
}
if attempt >= 5 {
return err
}
// backoff
sleep := time.Duration(500*(1<<uint(attempt-1))) * time.Millisecond
if sleep > 10*time.Second {
sleep = 10 * time.Second
}
time.Sleep(sleep)
}
}
timer := time.NewTimer(batchWait)
defer timer.Stop()
for {
select {
case <-ctx.Done():
// flush remaining
_ = sendBatch(buf)
return
case rec, ok := <-in:
if !ok {
// channel closed -> flush and exit
_ = sendBatch(buf)
return
}
buf = append(buf, rec)
if len(buf) >= batchSize {
_ = sendBatch(buf)
buf = buf[:0]
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(batchWait)
}
case <-timer.C:
if len(buf) > 0 {
_ = sendBatch(buf)
buf = buf[:0]
}
timer.Reset(batchWait)
}
}
}
使用方法
- 1. 初始化模块并获取依赖:
go mod init example.com/log-agent
go get github.com/hpcloud/tail
go build -o log-agent main.go
- 2. 运行(示例):
./log-agent -paths "/var/log/myapp/*.log" -endpoint "http://log-collector:8080/ingest" -batch 100 -workers 4
- 3. 建议把 agent 用 systemd 管理或容器化部署为 DaemonSet(K8s)或 sidecar。
实践要点与注意事项
- • 日志切割:使用
ReOpen: true
可处理logrotate
产生的新文件句柄;生产环境建议结合 inode 校验与持久化 offset(例如把 offset 存到本地文件或 SQLite)以支持重启断点续传。 - • 传输安全:生产环境使用 HTTPS + 鉴权(API Key / mTLS)来防止日志被窃取或篡改。
- • 后端吞吐:发送端需要限流与批次控制,避免短时间内把流量拉爆目标端。也可以使用本地磁盘队列(如 diskqueue)在网络中断时持久化缓存。
- • 结构化日志:尽量让应用输出结构化 JSON 日志,这样聚合与查询更强。若是 plain text,可在 agent 处做简单解析(regex)或转发原始行。
- • 监控与自检:给 agent 加入心跳/metrics(Prometheus)接口,监控发送失败数、队列长度等关键指标。
- • 日志隐私:注意日志中可能包含敏感数据(PII、密码、token),可在 agent 端进行脱敏或过滤再上报。
进一步改进(思路)
- • 使用持久化队列(disk-backed)保证断网或进程崩溃后不丢日志。
- • 支持多种传输后端:Kafka、gRPC、AWS S3、Elasticsearch 等。
- • 支持日志标签(service、env、pod)自动注入(从系统 / 环境变量获取)。
- • 增加插件化解析器(nginx、app custom parser)做字段抽取。
- • 通过 Web UI 或配置中心动态下发采集规则。
总结
这篇文章展示了如何用 Go 快速实现一个可靠、可扩展的日志收集脚本:从文件采集、切割处理,到批量发送与重试策略,都给出了实际可运行的示例代码。实现中充分利用了 Go 的并发、channel 与 context,代码简洁、易扩展。把这个 agent 打包部署在每台节点上,就能为后端日志聚合系统提供稳定可靠的数据源。
更多推荐
所有评论(0)