Linux 内核中管道的实现

概述

本文档基于 Linux 2.6.12,从逻辑和源码两个层面说明内核管道(pipe)的实现机制。

逻辑层面的实现

数据结构

1. pipe_inode_info

pipe_inode_info 是管道的核心结构:

// include/linux/pipe_fs_i.h
struct pipe_inode_info {
    wait_queue_head_t wait;           // 单等待队列(读写共用)
    unsigned int nrbufs;              // 非空缓冲区数量
    unsigned int curbuf;              // 当前缓冲区索引
    struct page *tmp_page;            // 临时页
    unsigned int readers;             // 读端计数
    unsigned int writers;             // 写端计数
    unsigned int waiting_readers;     // 等待读端数
    unsigned int waiting_writers;     // 等待写端数
    unsigned int r_counter;           // 读计数器
    unsigned int w_counter;           // 写计数器
    struct fasync_struct *fasync_readers;
    struct fasync_struct *fasync_writers;
    struct pipe_buffer bufs[PIPE_BUFFERS]; // 固定 16 个缓冲槽
};
2. pipe_buffer
struct pipe_buffer {
    struct page *page;
    unsigned int offset, len;
    const struct pipe_buf_operations *ops;
    unsigned int flags;
    unsigned long private;
};

缓冲区容量PIPE_BUFFERS 固定为 16,每槽一页(4KB),总容量约 64KB;不可动态扩展。

缓冲区为环形使用:curbuf 指向当前读槽,nrbufs 表示非空槽数量,写入槽索引计算 (curbuf + nrbufs) & (PIPE_BUFFERS - 1)

核心算法流程

1. 管道创建流程(入口在 arch/x86_64/kernel/sys_x86_64.c,核心逻辑在 fs/pipe.c)
用户空间: pipe(fd[2])
    ↓
系统调用入口: sys_pipe(arch/x86_64/kernel/sys_x86_64.c 包装,调用 fs/pipe.c 的 do_pipe)
    ↓
创建 inode & 分配 pipe_inode_info
    ↓
创建两个 file 结构并安装到 fd[0], fd[1]
2. 数据写入流程
用户空间: write(fd[1], buf, count)
    ↓
系统调用: sys_write
    ↓
文件操作: pipe_write()
    ↓
检查状态:
    - 无读端: SIGPIPE / -EPIPE
    - 缓冲区满: 阻塞或返回 EAGAIN (O_NONBLOCK)
    ↓
写入位置: 写入槽 = (curbuf + nrbufs) & (PIPE_BUFFERS-1)
    - 如需分配页,则 alloc_page
    - copy_from_user 到槽的页
    - 更新 len,必要时 nrbufs++
    ↓
唤醒等待的读者: wake_up_interruptible()
    ↓
返回写入字节数
3. 数据读取流程
用户空间: read(fd[0], buf, count)
    ↓
系统调用: sys_read
    ↓
文件操作: pipe_read()
    ↓
检查状态:
    - 无数据且写端为 0: 返回 0 (EOF)
    - 无数据且写端存在: 阻塞或 EAGAIN
    ↓
读取位置: curbuf 槽,偏移 start/len
    - copy_to_user
    - 槽读空后释放页,curbuf 前移(按位与取模),nrbufs--
    ↓
唤醒等待的写者: wake_up_interruptible()
    ↓
返回读取字节数
4. 管道关闭流程
用户空间: close(fd[0]) 或 close(fd[1])
    ↓
系统调用: SYSCALL_DEFINE1(close, ...)
    ↓
VFS 层: filp_close()
    ↓
文件操作: pipe_release()
    ↓
更新引用计数:
    - 关闭读端: readers--
    - 关闭写端: writers--
    ↓
检查是否需要释放:
    - readers == 0 && writers == 0
    ↓
释放资源:
    - 释放页内存
    - 释放 pipe_inode_info
    - 释放 inode

同步机制

1. 互斥锁(Mutex)

管道使用互斥锁保护关键数据结构:

  • 保护范围:pipe_inode_info 结构
  • 使用场景:读写操作、状态更新
  • 特点:可中断的互斥锁(mutex_lock_interruptible)
2. 等待队列(Wait Queue)

用于实现阻塞 I/O:

  • 读等待队列:当缓冲区为空时,读者阻塞
  • 写等待队列:当缓冲区满时,写者阻塞
  • 唤醒机制:数据到达/空间可用时唤醒
3. 原子操作

使用原子变量保证并发安全:

  • head/tail 更新:使用原子操作
  • 引用计数:readers/writers 使用原子操作

缓冲区管理

1. 页分配策略
  • 初始分配:创建时分配 16 页(64KB)
  • 动态扩展:需要时可以扩展(通过 fcntl)
  • 页回收:读取后可以回收空闲页
2. 数据布局
页结构:
[页0] [页1] [页2] ... [页N-1]
  |     |     |         |
 数据  数据  数据     数据

每个页可以存储 4KB 数据,通过 head/tail 指针管理。

3. 边界处理
  • 循环回绕:head/tail 到达末尾时回到开头
  • 部分写入:跨页边界的写入需要分两次
  • 部分读取:跨页边界的读取需要分两次

源码层面的实现

关键数据结构

1. pipe_inode_info
// 位置: include/linux/pipe_fs_i.h
struct pipe_inode_info {
    wait_queue_head_t wait;           // 单等待队列(读写共用)
    unsigned int nrbufs;              // 非空缓冲区数量
    unsigned int curbuf;              // 当前缓冲区索引
    struct page *tmp_page;            // 临时页
    unsigned int readers;             // 读端计数
    unsigned int writers;             // 写端计数
    unsigned int waiting_readers;     // 等待读端数
    unsigned int waiting_writers;     // 等待写端数
    unsigned int r_counter;           // 读计数器
    unsigned int w_counter;           // 写计数器
    struct fasync_struct *fasync_readers;
    struct fasync_struct *fasync_writers;
    struct pipe_buffer bufs[PIPE_BUFFERS]; // 固定 16 个缓冲槽
};
2. pipe_buffer
// 位置: include/linux/pipe_fs_i.h

struct pipe_buffer {
    struct page *page;                // 页指针
    unsigned int offset, len;          // 偏移和长度
    const struct pipe_buf_operations *ops;  // 操作函数
    unsigned int flags;
    unsigned long private;
};
3. pipe_file_operations(匿名管道读/写端)
// 位置: fs/pipe.c
static struct file_operations read_pipe_fops = {
    .llseek  = no_llseek,         // 管道不支持随机访问
    .read    = pipe_read,         // 读取管道数据
    .readv   = pipe_readv,        // 向量读取
    .write   = bad_pipe_w,        // 读端禁止写
    .poll    = pipe_poll,         // 支持 poll/epoll
    .ioctl   = pipe_ioctl,        // 管道 ioctl
    .release = pipe_read_release, // 读端关闭
    .fasync  = pipe_read_fasync,  // 异步通知(SIGIO)
};

static struct file_operations write_pipe_fops = {
    .llseek  = no_llseek,          // 不支持随机访问
    .read    = bad_pipe_r,         // 写端禁止读
    .write   = pipe_write,         // 写入管道数据
    .writev  = pipe_writev,        // 向量写入
    .poll    = pipe_poll,          // 支持 poll/epoll
    .ioctl   = pipe_ioctl,         // 管道 ioctl
    .release = pipe_write_release, // 写端关闭
    .fasync  = pipe_write_fasync,  // 异步通知(SIGIO)
};

关键函数实现

1. pipe() 系统调用入口与核心(以 x86_64 为例)
  • 系统调用入口:arch/x86_64/kernel/sys_x86_64.c 中的 sys_pipe,负责获取用户参数并调用通用实现,典型代码:
asmlinkage long sys_pipe(unsigned long __user *fildes)
{
    int fd[2];
    int error = do_pipe(fd);          // 核心在 fs/pipe.c

    if (!error && copy_to_user(fildes, fd, sizeof(fd)))
        error = -EFAULT;
    return error;
}
  • 核心逻辑:fs/pipe.c 中的 do_pipe() 完成管道创建与 fd 安装。

关键步骤(do_pipe)

  1. 分配两个空闲 fd(get_unused_fd
  2. 获取管道 inode(get_pipe_inodepipe_newnew_inode),初始化 pipe_inode_info、等待队列/计数器,设置 i_pipei_fop=rdwr_pipe_fops
  3. 创建读/写端 file 结构并设置 f_opread_pipe_fops / write_pipe_fops
  4. 安装到 fd[0] / fd[1],返回用户态
2. get_pipe_inode() / pipe_new()
  • 位置: fs/pipe.c
  • 作用: 分配并初始化管道 inode 与 pipe_inode_info,设置 i_pipei_fop=rdwr_pipe_fops,初始 readers/writers 为 1。
static struct inode * get_pipe_inode(void)
{
    struct inode *inode = new_inode(pipe_mnt->mnt_sb);   // 分配 pipefs 上的 inode
    if (!inode)
        return NULL;

    if (!pipe_new(inode))                                // kmalloc 清零 pipe_inode_info,init wait/计数
        goto fail;

    PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1;     // 初始读写端计数
    inode->i_fop = &rdwr_pipe_fops;                      // 读写端共享的 fops(决定后续 split 到读/写 fops)

    inode->i_state = I_DIRTY;                            // 标脏,避免后续 mark_inode_dirty
    inode->i_mode  = S_IFIFO | S_IRUSR | S_IWUSR;        // 类型与权限
    inode->i_uid   = current->fsuid;
    inode->i_gid   = current->fsgid;
    inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
    inode->i_blksize = PAGE_SIZE;
    return inode;
fail:
    iput(inode);
    return NULL;
}
3. pipe_write() 代码示例
static ssize_t pipe_write(struct file *filp, const char __user *buf,
                          size_t count, loff_t *ppos)
{
    struct inode *inode = filp->f_dentry->d_inode;
    struct pipe_inode_info *pipe = inode->i_pipe;
    ssize_t ret = 0;

    while (count > 0) {
        int space, idx;

        if (!pipe->readers)
            return ret ? ret : -EPIPE;

        space = PIPE_BUFFERS - pipe->nrbufs;
        if (space) {
            idx = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);

            if (!pipe->bufs[idx].page) {
                pipe->bufs[idx].page = alloc_page(GFP_USER);
                if (!pipe->bufs[idx].page)
                    return ret ? ret : -ENOMEM;
                pipe->bufs[idx].ops = &anon_pipe_buf_ops;
                pipe->bufs[idx].offset = 0;
                pipe->bufs[idx].len = 0;
            }

            space = min_t(int, PAGE_SIZE - pipe->bufs[idx].len, count);
            if (copy_from_user(page_address(pipe->bufs[idx].page) +
                               pipe->bufs[idx].len, buf, space))
                return ret ? ret : -EFAULT;

            pipe->bufs[idx].len += space;
            buf += space;
            count -= space;
            ret += space;

            if (pipe->bufs[idx].len == PAGE_SIZE)
                pipe->nrbufs++;

            wake_up_interruptible(&pipe->wait);
            continue;
        }

        if (filp->f_flags & O_NONBLOCK)
            return ret ? ret : -EAGAIN;

        pipe->waiting_writers++;
        pipe_wait(inode);
        pipe->waiting_writers--;
    }
    return ret;
}
4. pipe_read() 代码示例
static ssize_t pipe_read(struct file *filp, char __user *buf,
                         size_t count, loff_t *ppos)
{
    struct inode *inode = filp->f_dentry->d_inode;
    struct pipe_inode_info *pipe = inode->i_pipe;
    ssize_t ret = 0;

    while (count > 0) {
        int idx, avail;

        if (pipe->nrbufs == 0) {
            if (!pipe->writers)
                break;                     // EOF
            if (filp->f_flags & O_NONBLOCK)
                return ret ? ret : -EAGAIN;
            pipe_wait(inode);
            continue;
        }

        idx = pipe->curbuf;
        avail = min_t(int, pipe->bufs[idx].len - pipe->start, count);

        if (copy_to_user(buf,
            page_address(pipe->bufs[idx].page) + pipe->start, avail))
            return ret ? ret : -EFAULT;

        pipe->start += avail;
        buf += avail;
        count -= avail;
        ret += avail;

        if (pipe->start >= pipe->bufs[idx].len) {
            __free_page(pipe->bufs[idx].page);
            pipe->bufs[idx].page = NULL;
            pipe->bufs[idx].len = pipe->bufs[idx].offset = 0;
            pipe->curbuf = (pipe->curbuf + 1) & (PIPE_BUFFERS - 1);
            pipe->nrbufs--;
            pipe->start = 0;
        }

        wake_up_interruptible(&pipe->wait);
        if (count == 0)
            break;
    }
    return ret;
}
5. pipe_release 系列
// 位置: fs/pipe.c
static int pipe_release(struct inode *inode, int decr, int decw)
{
    down(PIPE_SEM(*inode));                     // 获取 pipe 信号量
    PIPE_READERS(*inode) -= decr;               // 更新读端计数
    PIPE_WRITERS(*inode) -= decw;               // 更新写端计数
    if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) {
        free_pipe_info(inode);                  // 最后一个端关闭,释放资源
    } else {
        wake_up_interruptible(PIPE_WAIT(*inode));               // 唤醒阻塞的对端
        kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);   // 通知读端
        kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);  // 通知写端
    }
    up(PIPE_SEM(*inode));                       // 释放信号量
    return 0;
}

static int pipe_read_release(struct inode *inode, struct file *filp)
{
    pipe_read_fasync(-1, filp, 0);              // 取消异步通知
    return pipe_release(inode, 1, 0);
}

static int pipe_write_release(struct inode *inode, struct file *filp)
{
    pipe_write_fasync(-1, filp, 0);             // 取消异步通知
    return pipe_release(inode, 0, 1);
}

static int pipe_rdwr_release(struct inode *inode, struct file *filp)
{
    int decr = (filp->f_mode & FMODE_READ) != 0;
    int decw = (filp->f_mode & FMODE_WRITE) != 0;
    pipe_rdwr_fasync(-1, filp, 0);              // 取消异步通知
    return pipe_release(inode, decr, decw);
}

关键实现细节

等待与唤醒

  • 单等待队列 wait(PIPE_WAIT 宏),读空或写满时进入 pipe_wait(inode),释放 i_sem、睡眠,事件触发后唤醒并重新持有。

内存管理与性能要点

  • 缓冲槽固定 16 页,总容量约 64KB,不支持运行时扩展。
  • 槽读空即释放页,避免多余占用;写侧必要时分配页。
  • 等待/唤醒依赖单等待队列,持有 i_sem 后再检查条件。

关键源码文件

主要文件

  1. fs/pipe.c

    • 管道的主要实现文件
    • 包含 pipe()、pipe_read()、pipe_write() 等函数
  2. include/linux/pipe_fs_i.h

    • 管道相关的数据结构定义
    • pipe_inode_info、pipe_buffer 等
  3. include/linux/pipe_fs_i.h

    • 管道文件系统的接口定义

相关系统调用

  • sys_pipe:创建管道
  • sys_read / sys_write:通过 VFS 读写
  • sys_close:关闭文件描述符

调试和监控

内核调试

# 查看管道统计信息
cat /proc/sys/fs/pipe-max-size
cat /proc/sys/fs/pipe-user-pages-hard
cat /proc/sys/fs/pipe-user-pages-soft

使用 ftrace 跟踪

# 启用管道相关函数跟踪
echo pipe_* > /sys/kernel/debug/tracing/set_ftrace_filter
echo function > /sys/kernel/debug/tracing/current_tracer

使用 perf 分析

# 分析管道相关系统调用
perf trace -e pipe,read,write

总结

Linux 内核中管道的实现具有以下特点:

  1. 数据结构

    • 使用 pipe_inode_info 作为核心数据结构
    • 使用环形缓冲区存储数据
    • 使用页(page)作为基本存储单元
  2. 同步机制

    • 互斥锁保护关键数据结构
    • 等待队列实现阻塞 I/O
    • 原子操作保证并发安全
  3. 性能优化

    • 位运算优化索引计算
    • 批量操作减少系统调用
    • 零拷贝技术(splice)
  4. 内存管理

    • 默认 16 页(64KB)缓冲区
    • 动态扩展和回收
    • 用户内存限制检查
  5. 错误处理

    • SIGPIPE 信号处理
    • 非阻塞模式支持
    • 资源限制检查

扩展阅读

  • Linux 内核源码:fs/pipe.c
  • Linux 内核文档:Documentation/filesystems/pipefs.txt
  • man 2 pipe
  • man 7 pipe
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐