Linux进程间通信之管道(pipe)实现篇
·
Linux 内核中管道的实现
概述
本文档基于 Linux 2.6.12,从逻辑和源码两个层面说明内核管道(pipe)的实现机制。
逻辑层面的实现
数据结构
1. pipe_inode_info
pipe_inode_info 是管道的核心结构:
// include/linux/pipe_fs_i.h
struct pipe_inode_info {
wait_queue_head_t wait; // 单等待队列(读写共用)
unsigned int nrbufs; // 非空缓冲区数量
unsigned int curbuf; // 当前缓冲区索引
struct page *tmp_page; // 临时页
unsigned int readers; // 读端计数
unsigned int writers; // 写端计数
unsigned int waiting_readers; // 等待读端数
unsigned int waiting_writers; // 等待写端数
unsigned int r_counter; // 读计数器
unsigned int w_counter; // 写计数器
struct fasync_struct *fasync_readers;
struct fasync_struct *fasync_writers;
struct pipe_buffer bufs[PIPE_BUFFERS]; // 固定 16 个缓冲槽
};
2. pipe_buffer
struct pipe_buffer {
struct page *page;
unsigned int offset, len;
const struct pipe_buf_operations *ops;
unsigned int flags;
unsigned long private;
};
缓冲区容量:PIPE_BUFFERS 固定为 16,每槽一页(4KB),总容量约 64KB;不可动态扩展。
缓冲区为环形使用:curbuf 指向当前读槽,nrbufs 表示非空槽数量,写入槽索引计算 (curbuf + nrbufs) & (PIPE_BUFFERS - 1)。
核心算法流程
1. 管道创建流程(入口在 arch/x86_64/kernel/sys_x86_64.c,核心逻辑在 fs/pipe.c)
用户空间: pipe(fd[2])
↓
系统调用入口: sys_pipe(arch/x86_64/kernel/sys_x86_64.c 包装,调用 fs/pipe.c 的 do_pipe)
↓
创建 inode & 分配 pipe_inode_info
↓
创建两个 file 结构并安装到 fd[0], fd[1]
2. 数据写入流程
用户空间: write(fd[1], buf, count)
↓
系统调用: sys_write
↓
文件操作: pipe_write()
↓
检查状态:
- 无读端: SIGPIPE / -EPIPE
- 缓冲区满: 阻塞或返回 EAGAIN (O_NONBLOCK)
↓
写入位置: 写入槽 = (curbuf + nrbufs) & (PIPE_BUFFERS-1)
- 如需分配页,则 alloc_page
- copy_from_user 到槽的页
- 更新 len,必要时 nrbufs++
↓
唤醒等待的读者: wake_up_interruptible()
↓
返回写入字节数
3. 数据读取流程
用户空间: read(fd[0], buf, count)
↓
系统调用: sys_read
↓
文件操作: pipe_read()
↓
检查状态:
- 无数据且写端为 0: 返回 0 (EOF)
- 无数据且写端存在: 阻塞或 EAGAIN
↓
读取位置: curbuf 槽,偏移 start/len
- copy_to_user
- 槽读空后释放页,curbuf 前移(按位与取模),nrbufs--
↓
唤醒等待的写者: wake_up_interruptible()
↓
返回读取字节数
4. 管道关闭流程
用户空间: close(fd[0]) 或 close(fd[1])
↓
系统调用: SYSCALL_DEFINE1(close, ...)
↓
VFS 层: filp_close()
↓
文件操作: pipe_release()
↓
更新引用计数:
- 关闭读端: readers--
- 关闭写端: writers--
↓
检查是否需要释放:
- readers == 0 && writers == 0
↓
释放资源:
- 释放页内存
- 释放 pipe_inode_info
- 释放 inode
同步机制
1. 互斥锁(Mutex)
管道使用互斥锁保护关键数据结构:
- 保护范围:pipe_inode_info 结构
- 使用场景:读写操作、状态更新
- 特点:可中断的互斥锁(mutex_lock_interruptible)
2. 等待队列(Wait Queue)
用于实现阻塞 I/O:
- 读等待队列:当缓冲区为空时,读者阻塞
- 写等待队列:当缓冲区满时,写者阻塞
- 唤醒机制:数据到达/空间可用时唤醒
3. 原子操作
使用原子变量保证并发安全:
- head/tail 更新:使用原子操作
- 引用计数:readers/writers 使用原子操作
缓冲区管理
1. 页分配策略
- 初始分配:创建时分配 16 页(64KB)
- 动态扩展:需要时可以扩展(通过 fcntl)
- 页回收:读取后可以回收空闲页
2. 数据布局
页结构:
[页0] [页1] [页2] ... [页N-1]
| | | |
数据 数据 数据 数据
每个页可以存储 4KB 数据,通过 head/tail 指针管理。
3. 边界处理
- 循环回绕:head/tail 到达末尾时回到开头
- 部分写入:跨页边界的写入需要分两次
- 部分读取:跨页边界的读取需要分两次
源码层面的实现
关键数据结构
1. pipe_inode_info
// 位置: include/linux/pipe_fs_i.h
struct pipe_inode_info {
wait_queue_head_t wait; // 单等待队列(读写共用)
unsigned int nrbufs; // 非空缓冲区数量
unsigned int curbuf; // 当前缓冲区索引
struct page *tmp_page; // 临时页
unsigned int readers; // 读端计数
unsigned int writers; // 写端计数
unsigned int waiting_readers; // 等待读端数
unsigned int waiting_writers; // 等待写端数
unsigned int r_counter; // 读计数器
unsigned int w_counter; // 写计数器
struct fasync_struct *fasync_readers;
struct fasync_struct *fasync_writers;
struct pipe_buffer bufs[PIPE_BUFFERS]; // 固定 16 个缓冲槽
};
2. pipe_buffer
// 位置: include/linux/pipe_fs_i.h
struct pipe_buffer {
struct page *page; // 页指针
unsigned int offset, len; // 偏移和长度
const struct pipe_buf_operations *ops; // 操作函数
unsigned int flags;
unsigned long private;
};
3. pipe_file_operations(匿名管道读/写端)
// 位置: fs/pipe.c
static struct file_operations read_pipe_fops = {
.llseek = no_llseek, // 管道不支持随机访问
.read = pipe_read, // 读取管道数据
.readv = pipe_readv, // 向量读取
.write = bad_pipe_w, // 读端禁止写
.poll = pipe_poll, // 支持 poll/epoll
.ioctl = pipe_ioctl, // 管道 ioctl
.release = pipe_read_release, // 读端关闭
.fasync = pipe_read_fasync, // 异步通知(SIGIO)
};
static struct file_operations write_pipe_fops = {
.llseek = no_llseek, // 不支持随机访问
.read = bad_pipe_r, // 写端禁止读
.write = pipe_write, // 写入管道数据
.writev = pipe_writev, // 向量写入
.poll = pipe_poll, // 支持 poll/epoll
.ioctl = pipe_ioctl, // 管道 ioctl
.release = pipe_write_release, // 写端关闭
.fasync = pipe_write_fasync, // 异步通知(SIGIO)
};
关键函数实现
1. pipe() 系统调用入口与核心(以 x86_64 为例)
- 系统调用入口:
arch/x86_64/kernel/sys_x86_64.c中的sys_pipe,负责获取用户参数并调用通用实现,典型代码:
asmlinkage long sys_pipe(unsigned long __user *fildes)
{
int fd[2];
int error = do_pipe(fd); // 核心在 fs/pipe.c
if (!error && copy_to_user(fildes, fd, sizeof(fd)))
error = -EFAULT;
return error;
}
- 核心逻辑:
fs/pipe.c中的do_pipe()完成管道创建与 fd 安装。
关键步骤(do_pipe):
- 分配两个空闲 fd(
get_unused_fd) - 获取管道 inode(
get_pipe_inode→pipe_new→new_inode),初始化pipe_inode_info、等待队列/计数器,设置i_pipe、i_fop=rdwr_pipe_fops - 创建读/写端
file结构并设置f_op(read_pipe_fops/write_pipe_fops) - 安装到
fd[0]/fd[1],返回用户态
2. get_pipe_inode() / pipe_new()
- 位置: fs/pipe.c
- 作用: 分配并初始化管道 inode 与
pipe_inode_info,设置i_pipe,i_fop=rdwr_pipe_fops,初始 readers/writers 为 1。
static struct inode * get_pipe_inode(void)
{
struct inode *inode = new_inode(pipe_mnt->mnt_sb); // 分配 pipefs 上的 inode
if (!inode)
return NULL;
if (!pipe_new(inode)) // kmalloc 清零 pipe_inode_info,init wait/计数
goto fail;
PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1; // 初始读写端计数
inode->i_fop = &rdwr_pipe_fops; // 读写端共享的 fops(决定后续 split 到读/写 fops)
inode->i_state = I_DIRTY; // 标脏,避免后续 mark_inode_dirty
inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR; // 类型与权限
inode->i_uid = current->fsuid;
inode->i_gid = current->fsgid;
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
inode->i_blksize = PAGE_SIZE;
return inode;
fail:
iput(inode);
return NULL;
}
3. pipe_write() 代码示例
static ssize_t pipe_write(struct file *filp, const char __user *buf,
size_t count, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *pipe = inode->i_pipe;
ssize_t ret = 0;
while (count > 0) {
int space, idx;
if (!pipe->readers)
return ret ? ret : -EPIPE;
space = PIPE_BUFFERS - pipe->nrbufs;
if (space) {
idx = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
if (!pipe->bufs[idx].page) {
pipe->bufs[idx].page = alloc_page(GFP_USER);
if (!pipe->bufs[idx].page)
return ret ? ret : -ENOMEM;
pipe->bufs[idx].ops = &anon_pipe_buf_ops;
pipe->bufs[idx].offset = 0;
pipe->bufs[idx].len = 0;
}
space = min_t(int, PAGE_SIZE - pipe->bufs[idx].len, count);
if (copy_from_user(page_address(pipe->bufs[idx].page) +
pipe->bufs[idx].len, buf, space))
return ret ? ret : -EFAULT;
pipe->bufs[idx].len += space;
buf += space;
count -= space;
ret += space;
if (pipe->bufs[idx].len == PAGE_SIZE)
pipe->nrbufs++;
wake_up_interruptible(&pipe->wait);
continue;
}
if (filp->f_flags & O_NONBLOCK)
return ret ? ret : -EAGAIN;
pipe->waiting_writers++;
pipe_wait(inode);
pipe->waiting_writers--;
}
return ret;
}
4. pipe_read() 代码示例
static ssize_t pipe_read(struct file *filp, char __user *buf,
size_t count, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *pipe = inode->i_pipe;
ssize_t ret = 0;
while (count > 0) {
int idx, avail;
if (pipe->nrbufs == 0) {
if (!pipe->writers)
break; // EOF
if (filp->f_flags & O_NONBLOCK)
return ret ? ret : -EAGAIN;
pipe_wait(inode);
continue;
}
idx = pipe->curbuf;
avail = min_t(int, pipe->bufs[idx].len - pipe->start, count);
if (copy_to_user(buf,
page_address(pipe->bufs[idx].page) + pipe->start, avail))
return ret ? ret : -EFAULT;
pipe->start += avail;
buf += avail;
count -= avail;
ret += avail;
if (pipe->start >= pipe->bufs[idx].len) {
__free_page(pipe->bufs[idx].page);
pipe->bufs[idx].page = NULL;
pipe->bufs[idx].len = pipe->bufs[idx].offset = 0;
pipe->curbuf = (pipe->curbuf + 1) & (PIPE_BUFFERS - 1);
pipe->nrbufs--;
pipe->start = 0;
}
wake_up_interruptible(&pipe->wait);
if (count == 0)
break;
}
return ret;
}
5. pipe_release 系列
// 位置: fs/pipe.c
static int pipe_release(struct inode *inode, int decr, int decw)
{
down(PIPE_SEM(*inode)); // 获取 pipe 信号量
PIPE_READERS(*inode) -= decr; // 更新读端计数
PIPE_WRITERS(*inode) -= decw; // 更新写端计数
if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) {
free_pipe_info(inode); // 最后一个端关闭,释放资源
} else {
wake_up_interruptible(PIPE_WAIT(*inode)); // 唤醒阻塞的对端
kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); // 通知读端
kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); // 通知写端
}
up(PIPE_SEM(*inode)); // 释放信号量
return 0;
}
static int pipe_read_release(struct inode *inode, struct file *filp)
{
pipe_read_fasync(-1, filp, 0); // 取消异步通知
return pipe_release(inode, 1, 0);
}
static int pipe_write_release(struct inode *inode, struct file *filp)
{
pipe_write_fasync(-1, filp, 0); // 取消异步通知
return pipe_release(inode, 0, 1);
}
static int pipe_rdwr_release(struct inode *inode, struct file *filp)
{
int decr = (filp->f_mode & FMODE_READ) != 0;
int decw = (filp->f_mode & FMODE_WRITE) != 0;
pipe_rdwr_fasync(-1, filp, 0); // 取消异步通知
return pipe_release(inode, decr, decw);
}
关键实现细节
等待与唤醒
- 单等待队列
wait(PIPE_WAIT 宏),读空或写满时进入pipe_wait(inode),释放 i_sem、睡眠,事件触发后唤醒并重新持有。
内存管理与性能要点
- 缓冲槽固定 16 页,总容量约 64KB,不支持运行时扩展。
- 槽读空即释放页,避免多余占用;写侧必要时分配页。
- 等待/唤醒依赖单等待队列,持有 i_sem 后再检查条件。
关键源码文件
主要文件
-
fs/pipe.c
- 管道的主要实现文件
- 包含 pipe()、pipe_read()、pipe_write() 等函数
-
include/linux/pipe_fs_i.h
- 管道相关的数据结构定义
- pipe_inode_info、pipe_buffer 等
-
include/linux/pipe_fs_i.h
- 管道文件系统的接口定义
相关系统调用
sys_pipe:创建管道sys_read/sys_write:通过 VFS 读写sys_close:关闭文件描述符
调试和监控
内核调试
# 查看管道统计信息
cat /proc/sys/fs/pipe-max-size
cat /proc/sys/fs/pipe-user-pages-hard
cat /proc/sys/fs/pipe-user-pages-soft
使用 ftrace 跟踪
# 启用管道相关函数跟踪
echo pipe_* > /sys/kernel/debug/tracing/set_ftrace_filter
echo function > /sys/kernel/debug/tracing/current_tracer
使用 perf 分析
# 分析管道相关系统调用
perf trace -e pipe,read,write
总结
Linux 内核中管道的实现具有以下特点:
-
数据结构:
- 使用
pipe_inode_info作为核心数据结构 - 使用环形缓冲区存储数据
- 使用页(page)作为基本存储单元
- 使用
-
同步机制:
- 互斥锁保护关键数据结构
- 等待队列实现阻塞 I/O
- 原子操作保证并发安全
-
性能优化:
- 位运算优化索引计算
- 批量操作减少系统调用
- 零拷贝技术(splice)
-
内存管理:
- 默认 16 页(64KB)缓冲区
- 动态扩展和回收
- 用户内存限制检查
-
错误处理:
- SIGPIPE 信号处理
- 非阻塞模式支持
- 资源限制检查
扩展阅读
- Linux 内核源码:
fs/pipe.c - Linux 内核文档:
Documentation/filesystems/pipefs.txt man 2 pipeman 7 pipe
更多推荐

所有评论(0)