【C/C++】Pipe与Socketpair详解
Pipe(管道)是 Unix/Linux 系统中最基本的进程间通信(IPC, Inter-Process Communication)机制。它提供一个单向的数据通道。Socketpair(套接字对)创建一对相互连接的 Unix domain socket(Unix 域套接字)。与 pipe 不同,它是双向的。│ ││ Pipe 管道: ││ ││ ││ 单向 One-way ││ ││ ││ So
·
Pipe 与 Socketpair 详解
目录
Pipe 管道
什么是 Pipe?
Pipe(管道)是 Unix/Linux 系统中最基本的进程间通信(IPC, Inter-Process Communication)机制。它提供一个单向的数据通道。
工作原理
┌─────────────────────────────────────────────────────────┐
│ │
│ Kernel 内核 │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Pipe Buffer │ │
│ │ 管道缓冲区 │ │
│ │ │ │
│ │ fd[1] ════════════════════════► fd[0] │ │
│ │ write read │ │
│ │ 写端 读端 │ │
│ │ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
调用 pipe(fd) 后:
fd[0]= 读端(read end)fd[1]= 写端(write end)- 数据只能从
fd[1]流向fd[0],单向流动
配合 fork() 使用
pipe(fd)
│
▼
┌─────────┐
│ Process │
│ fd[0] │
│ fd[1] │
└────┬────┘
│
fork()
│
┌───────────┴───────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ Parent │ │ Child │
│ 父进程 │ │ 子进程 │
│ │ │ │
│ fd[0] ✗ │ │ fd[0] ✓ │
│ fd[1] ✓ │ │ fd[1] ✗ │
│ │ │ │
│ 写数据 │ ─────────► │ 读数据 │
│ │ │ │
└──────────┘ └──────────┘
✓ = 保留 ✗ = 关闭
代码实现
#include <unistd.h> // pipe, fork, read, write, close
#include <sys/wait.h> // wait
#include <iostream>
#include <cstring>
int main() {
// Step 1: 创建管道
// Create pipe
int fd[2];
if (pipe(fd) == -1) {
perror("pipe failed");
return 1;
}
// fd[0] = read end (读端)
// fd[1] = write end (写端)
// Step 2: 创建子进程
// Create child process
pid_t pid = fork();
if (pid == -1) {
perror("fork failed");
return 1;
}
if (pid > 0) {
// ========== Parent Process 父进程 ==========
// 关闭不需要的读端
// Close unused read end
close(fd[0]);
// 向管道写入数据
// Write data to pipe
const char* msg = "Hello from parent!";
ssize_t bytes_written = write(fd[1], msg, strlen(msg));
std::cout << "Parent: wrote " << bytes_written << " bytes\n";
// 关闭写端,发送 EOF 给子进程
// Close write end, sends EOF to child
close(fd[1]);
// 等待子进程结束
// Wait for child to finish
wait(nullptr);
} else {
// ========== Child Process 子进程 ==========
// 关闭不需要的写端
// Close unused write end
close(fd[1]);
// 从管道读取数据
// Read data from pipe
char buf[256] = {0};
ssize_t bytes_read = read(fd[0], buf, sizeof(buf) - 1);
std::cout << "Child: read " << bytes_read << " bytes\n";
std::cout << "Child: message = \"" << buf << "\"\n";
// 关闭读端
// Close read end
close(fd[0]);
}
return 0;
}
输出:
Parent: wrote 18 bytes
Child: read 18 bytes
Child: message = "Hello from parent!"
双向通信需要两个 Pipe
┌─────────────┐ ┌─────────────┐
│ Parent │ │ Child │
│ 父进程 │ │ 子进程 │
│ │ │ │
│ write ────────── pipe1 ───────────► read │
│ │ 父 → 子 │ │
│ │ │ │
│ read ◄─────────── pipe2 ────────── write │
│ │ 子 → 父 │ │
└─────────────┘ └─────────────┘
#include <unistd.h>
#include <sys/wait.h>
#include <iostream>
#include <cstring>
int main() {
// 需要两个管道实现双向通信
// Need two pipes for bidirectional communication
int parent_to_child[2]; // 父 → 子
int child_to_parent[2]; // 子 → 父
pipe(parent_to_child);
pipe(child_to_parent);
pid_t pid = fork();
if (pid > 0) {
// ========== Parent 父进程 ==========
close(parent_to_child[0]); // 关闭 pipe1 读端
close(child_to_parent[1]); // 关闭 pipe2 写端
// 发送给子进程
// Send to child
const char* msg = "Hello child!";
write(parent_to_child[1], msg, strlen(msg));
close(parent_to_child[1]);
// 从子进程接收
// Receive from child
char buf[256] = {0};
read(child_to_parent[0], buf, sizeof(buf) - 1);
std::cout << "Parent received: " << buf << "\n";
close(child_to_parent[0]);
wait(nullptr);
} else {
// ========== Child 子进程 ==========
close(parent_to_child[1]); // 关闭 pipe1 写端
close(child_to_parent[0]); // 关闭 pipe2 读端
// 从父进程接收
// Receive from parent
char buf[256] = {0};
read(parent_to_child[0], buf, sizeof(buf) - 1);
std::cout << "Child received: " << buf << "\n";
close(parent_to_child[0]);
// 发送给父进程
// Send to parent
const char* reply = "Hello parent!";
write(child_to_parent[1], reply, strlen(reply));
close(child_to_parent[1]);
}
return 0;
}
Socketpair 套接字对
什么是 Socketpair?
Socketpair(套接字对)创建一对相互连接的 Unix domain socket(Unix 域套接字)。与 pipe 不同,它是双向的。
工作原理
┌─────────────────────────────────────────────────────────┐
│ │
│ Kernel 内核 │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Unix Domain Socket │ │
│ │ Unix 域套接字 │ │
│ │ │ │
│ │ sv[0] ◄══════════════════════► sv[1] │ │
│ │ read/write read/write │ │
│ │ 读/写 读/写 │ │
│ │ │ │
│ │ 双向通信 Bidirectional │ │
│ │ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
调用 socketpair(AF_UNIX, SOCK_STREAM, 0, sv) 后:
sv[0]和sv[1]互相连接- 写入
sv[0]的数据可从sv[1]读取 - 写入
sv[1]的数据可从sv[0]读取 - 双向通信,只需一对 fd
配合 fork() 使用
socketpair(sv)
│
▼
┌─────────┐
│ Process │
│ sv[0] │
│ sv[1] │
└────┬────┘
│
fork()
│
┌───────────┴───────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ Parent │ │ Child │
│ 父进程 │ │ 子进程 │
│ │ │ │
│ sv[0] ✓ │◄═════════►│ sv[1] ✓ │
│ sv[1] ✗ │ │ sv[0] ✗ │
│ │ │ │
│ 读/写 │ │ 读/写 │
│ │ │ │
└──────────┘ └──────────┘
✓ = 保留 ✗ = 关闭
代码实现
#include <sys/socket.h> // socketpair
#include <sys/wait.h> // wait
#include <unistd.h> // fork, read, write, close
#include <iostream>
#include <cstring>
int main() {
// Step 1: 创建套接字对
// Create socket pair
int sv[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
// ─────── ───────────
// │ │
// │ └─► 流式套接字 (TCP-like)
// │ 也可用 SOCK_DGRAM (UDP-like)
// │
// └─► 必须是 AF_UNIX (本地通信)
// Must be AF_UNIX (local only)
perror("socketpair failed");
return 1;
}
// sv[0] ◄──────► sv[1] 双向连接
// Step 2: 创建子进程
// Create child process
pid_t pid = fork();
if (pid == -1) {
perror("fork failed");
return 1;
}
if (pid > 0) {
// ========== Parent Process 父进程 ==========
// 关闭不需要的一端
// Close one end
close(sv[1]);
// 发送数据给子进程
// Send data to child
const char* msg = "Hello from parent!";
write(sv[0], msg, strlen(msg));
std::cout << "Parent: sent message\n";
// 接收子进程的回复
// Receive reply from child
char buf[256] = {0};
ssize_t n = read(sv[0], buf, sizeof(buf) - 1);
std::cout << "Parent: received \"" << buf << "\"\n";
close(sv[0]);
wait(nullptr);
} else {
// ========== Child Process 子进程 ==========
// 关闭不需要的一端
// Close one end
close(sv[0]);
// 接收父进程的消息
// Receive message from parent
char buf[256] = {0};
ssize_t n = read(sv[1], buf, sizeof(buf) - 1);
std::cout << "Child: received \"" << buf << "\"\n";
// 发送回复给父进程
// Send reply to parent
const char* reply = "Hello from child!";
write(sv[1], reply, strlen(reply));
std::cout << "Child: sent reply\n";
close(sv[1]);
}
return 0;
}
输出:
Parent: sent message
Child: received "Hello from parent!"
Child: sent reply
Parent: received "Hello from child!"
对比总结
Pipe vs Socketpair
┌─────────────────────────────────────────────────────────────────┐
│ │
│ Pipe 管道: │
│ │
│ fd[1] ─────────────────────────► fd[0] │
│ write read │
│ │
│ 单向 One-way │
│ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Socketpair 套接字对: │
│ │
│ sv[0] ◄────────────────────────► sv[1] │
│ read/write read/write │
│ │
│ 双向 Bidirectional │
│ │
└─────────────────────────────────────────────────────────────────┘
功能对比表
| 特性 Feature | Pipe 管道 | Socketpair 套接字对 |
|---|---|---|
| 通信方向 Direction | 单向 One-way | 双向 Bidirectional |
| 创建方式 Creation | pipe(fd) |
socketpair(AF_UNIX, SOCK_STREAM, 0, sv) |
| 使用 sendmsg | ✗ 不支持 | ✓ 支持 |
| 传递 fd (Pass fd) | ✗ 不支持 | ✓ 支持 |
| 双向通信需要 | 两个 pipe | 一个 socketpair |
| 性能 Performance | 略快 Slightly faster | 略慢 Slightly slower |
| 使用场景 Use case | 简单数据传输 | 需要传递 fd 或双向通信 |
使用 sendmsg/recvmsg 传递文件描述符
为什么需要传递 fd?
每个进程有独立的文件描述符表(file descriptor table)。fd 只是一个数字(0, 1, 2, 3…),直接传递数字没有意义。
┌─────────────────────┐ ┌─────────────────────┐
│ Process A │ │ Process B │
│ 进程 A │ │ 进程 B │
│ │ │ │
│ fd 0 → stdin │ │ fd 0 → stdin │
│ fd 1 → stdout │ │ fd 1 → stdout │
│ fd 2 → stderr │ │ fd 2 → stderr │
│ fd 3 → socket ─────┼──┐ │ fd 3 → ??? │
│ │ │ │ │
└─────────────────────┘ │ └─────────────────────┘
│
│ 直接发送 "3" 没用!
│ Sending "3" is useless!
sendmsg 如何工作
sendmsg 通过 control message(控制消息)让内核在目标进程创建新的 fd,指向相同的内核对象。
┌─────────────────────┐ ┌─────────────────────┐
│ Process A │ │ Process B │
│ 进程 A │ │ 进程 B │
│ │ │ │
│ fd 3 ─────────┐ │ │ ┌──────── fd 5 │
│ │ │ │ │ │
└────────────────│────┘ └────│────────────────┘
│ │
│ sendmsg │
│ SCM_RIGHTS │
▼ ▼
┌─────────────────────────────────┐
│ Kernel 内核 │
│ │
│ ┌──────────────────────┐ │
│ │ Socket 内核对象 │ │
│ │ (同一个对象!) │ │
│ │ (Same object!) │ │
│ └──────────────────────┘ │
│ │
└─────────────────────────────────┘
fd 3 (进程A) 和 fd 5 (进程B) 指向同一个内核对象!
fd 3 (Process A) and fd 5 (Process B) point to same kernel object!
msghdr 结构体
struct msghdr {
void *msg_name; // 目标地址 (socketpair 不需要)
socklen_t msg_namelen; // 地址长度
struct iovec *msg_iov; // 数据缓冲区数组 (scatter/gather I/O)
size_t msg_iovlen; // 缓冲区数量
void *msg_control; // 控制消息 (用于传递 fd)
size_t msg_controllen; // 控制消息长度
int msg_flags; // 标志
};
struct iovec {
void *iov_base; // 缓冲区起始地址
size_t iov_len; // 缓冲区长度
};
struct cmsghdr {
size_t cmsg_len; // 控制消息长度
int cmsg_level; // 协议层 (SOL_SOCKET)
int cmsg_type; // 消息类型 (SCM_RIGHTS 表示传递 fd)
// followed by unsigned char cmsg_data[]
};
┌─────────────────────────────────────────────────────────────┐
│ msghdr │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ msg_iov ──────► iovec[] │ │
│ │ ┌─────────────────┐ │ │
│ │ │ "hello" (数据) │ │ │
│ │ └─────────────────┘ │ │
│ │ 普通数据 Normal data │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ msg_control ──► cmsghdr │ │
│ │ ┌─────────────────┐ │ │
│ │ │ SCM_RIGHTS │ │ │
│ │ │ fd = 5 │ │ │
│ │ └─────────────────┘ │ │
│ │ 控制信息 Control message │ │
│ │ (传递文件描述符) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
完整代码实现
#include <sys/socket.h> // socketpair, sendmsg, recvmsg
#include <sys/wait.h> // wait
#include <unistd.h> // fork, read, write, close
#include <fcntl.h> // open
#include <iostream>
#include <cstring>
/**
* 发送文件描述符到另一个进程
* Send file descriptor to another process
*
* @param sock Unix domain socket (用于通信)
* @param fd 要发送的文件描述符 (file descriptor to send)
* @return 成功返回 0, 失败返回 -1
*/
int send_fd(int sock, int fd) {
// ========== 1. 准备普通数据 ==========
// Must send at least 1 byte of real data
// 必须发送至少 1 字节的实际数据
char buf[1] = {'F'}; // 'F' for "fd"
struct iovec iov;
iov.iov_base = buf;
iov.iov_len = sizeof(buf);
// ========== 2. 准备控制消息 ==========
// Prepare control message for passing fd
// CMSG_SPACE: 计算控制消息需要的空间
char control[CMSG_SPACE(sizeof(int))];
memset(control, 0, sizeof(control));
// ========== 3. 填充 msghdr ==========
struct msghdr msg = {0};
msg.msg_iov = &iov; // 普通数据
msg.msg_iovlen = 1;
msg.msg_control = control; // 控制消息
msg.msg_controllen = sizeof(control);
// ========== 4. 填充 cmsghdr ==========
// CMSG_FIRSTHDR: 获取第一个控制消息头
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET; // Socket 层
cmsg->cmsg_type = SCM_RIGHTS; // 传递文件描述符
cmsg->cmsg_len = CMSG_LEN(sizeof(int)); // 消息长度
// CMSG_DATA: 获取控制消息数据区指针
// 把 fd 放进去
*reinterpret_cast<int*>(CMSG_DATA(cmsg)) = fd;
// ========== 5. 发送 ==========
ssize_t n = sendmsg(sock, &msg, 0);
return (n >= 0) ? 0 : -1;
}
/**
* 从另一个进程接收文件描述符
* Receive file descriptor from another process
*
* @param sock Unix domain socket (用于通信)
* @return 收到的文件描述符, 失败返回 -1
*/
int recv_fd(int sock) {
// ========== 1. 准备接收普通数据 ==========
char buf[1];
struct iovec iov;
iov.iov_base = buf;
iov.iov_len = sizeof(buf);
// ========== 2. 准备接收控制消息 ==========
char control[CMSG_SPACE(sizeof(int))];
// ========== 3. 填充 msghdr ==========
struct msghdr msg = {0};
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
// ========== 4. 接收 ==========
ssize_t n = recvmsg(sock, &msg, 0);
if (n < 0) {
return -1;
}
// ========== 5. 提取 fd ==========
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
if (cmsg == nullptr ||
cmsg->cmsg_level != SOL_SOCKET ||
cmsg->cmsg_type != SCM_RIGHTS) {
return -1;
}
// 从控制消息中取出 fd
return *reinterpret_cast<int*>(CMSG_DATA(cmsg));
}
int main() {
// ========== 创建 socketpair ==========
int sv[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
perror("socketpair");
return 1;
}
pid_t pid = fork();
if (pid > 0) {
// ========== Parent 父进程 ==========
close(sv[1]);
// 打开一个文件
// Open a file
int file_fd = open("test.txt", O_CREAT | O_RDWR | O_TRUNC, 0644);
if (file_fd == -1) {
perror("open");
return 1;
}
// 写入一些内容
// Write some content
const char* content = "Secret message from parent!\n";
write(file_fd, content, strlen(content));
// 重置文件位置到开头
// Reset file position to beginning
lseek(file_fd, 0, SEEK_SET);
std::cout << "Parent: opened file, fd = " << file_fd << "\n";
std::cout << "Parent: sending fd to child...\n";
// 发送 fd 给子进程
// Send fd to child
if (send_fd(sv[0], file_fd) == -1) {
perror("send_fd");
return 1;
}
std::cout << "Parent: fd sent successfully\n";
// 父进程可以关闭自己的 fd 了
// Parent can close its own fd now
close(file_fd);
close(sv[0]);
wait(nullptr);
} else {
// ========== Child 子进程 ==========
close(sv[0]);
std::cout << "Child: waiting for fd...\n";
// 接收 fd
// Receive fd
int received_fd = recv_fd(sv[1]);
if (received_fd == -1) {
perror("recv_fd");
return 1;
}
std::cout << "Child: received fd = " << received_fd << "\n";
// 使用收到的 fd 读取文件内容
// Use received fd to read file content
char buf[256] = {0};
ssize_t n = read(received_fd, buf, sizeof(buf) - 1);
std::cout << "Child: read " << n << " bytes from file\n";
std::cout << "Child: content = \"" << buf << "\"\n";
close(received_fd);
close(sv[1]);
}
return 0;
}
编译和运行:
g++ -o fd_passing fd_passing.cpp
./fd_passing
输出:
Parent: opened file, fd = 5
Parent: sending fd to child...
Parent: fd sent successfully
Child: waiting for fd...
Child: received fd = 4
Child: read 28 bytes from file
Child: content = "Secret message from parent!
"
注意: 父进程的 fd 是 5,子进程收到的 fd 是 4。数字不同,但指向同一个文件!
注意事项
1. Pipe 注意事项
// ❌ 错误:没有关闭不需要的 fd
// Wrong: not closing unused fd
pid_t pid = fork();
if (pid > 0) {
// Parent 没关闭 fd[0]
write(fd[1], "hello", 5);
} else {
// Child 永远不会收到 EOF!
// Child never gets EOF!
read(fd[0], buf, sizeof(buf)); // 可能永远阻塞
}
// ✓ 正确:关闭不需要的 fd
// Correct: close unused fd
pid_t pid = fork();
if (pid > 0) {
close(fd[0]); // 关闭读端
write(fd[1], "hello", 5);
close(fd[1]); // 发送 EOF
} else {
close(fd[1]); // 关闭写端
read(fd[0], buf, sizeof(buf)); // 正常读取直到 EOF
close(fd[0]);
}
2. Socketpair 注意事项
// ❌ 错误:使用 AF_INET
// Wrong: using AF_INET
socketpair(AF_INET, SOCK_STREAM, 0, sv); // 失败!
// ✓ 正确:必须使用 AF_UNIX
// Correct: must use AF_UNIX
socketpair(AF_UNIX, SOCK_STREAM, 0, sv); // 成功
3. sendmsg 注意事项
// ❌ 错误:只发送控制消息,没有数据
// Wrong: only control message, no data
msg.msg_iov = nullptr;
msg.msg_iovlen = 0;
sendmsg(sock, &msg, 0); // 某些系统会失败!
// ✓ 正确:至少发送 1 字节数据
// Correct: send at least 1 byte
char buf[1] = {'x'};
struct iovec iov = {buf, 1};
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
sendmsg(sock, &msg, 0); // 成功
4. 传递 fd 的限制
┌─────────────────────────────────────────────────────────────┐
│ │
│ 可以传递 fd 的情况: │
│ Can pass fd: │
│ │
│ ✓ Unix domain socket (AF_UNIX) │
│ ✓ socketpair 创建的套接字 │
│ │
├─────────────────────────────────────────────────────────────┤
│ │
│ 不能传递 fd 的情况: │
│ Cannot pass fd: │
│ │
│ ✗ Pipe (管道不是 socket) │
│ ✗ TCP socket (不同机器,内核不同) │
│ ✗ UDP socket (同上) │
│ │
└─────────────────────────────────────────────────────────────┘
5. CMSG 宏使用
// 计算控制消息空间
// Calculate control message space
CMSG_SPACE(sizeof(int)) // 包含对齐 padding
// 计算控制消息长度
// Calculate control message length
CMSG_LEN(sizeof(int)) // 不包含尾部 padding
// 获取第一个控制消息头
// Get first control message header
CMSG_FIRSTHDR(&msg)
// 获取下一个控制消息头
// Get next control message header
CMSG_NXTHDR(&msg, cmsg)
// 获取控制消息数据区指针
// Get control message data pointer
CMSG_DATA(cmsg)
6. 实际应用场景
| 场景 Scenario | 使用 Use |
|---|---|
| 简单父子进程通信 | Pipe |
| 需要双向通信 | Socketpair |
| 进程池 (Process pool) | Socketpair + sendmsg |
| 传递 socket 给 worker | sendmsg + SCM_RIGHTS |
| 日志服务器 | Pipe (单向写入) |
进程池架构示例 Process Pool Example:
┌─────────────────────────────────────────┐
│ Master Process │
│ 主进程 │
│ │
│ listen_fd ◄─── 客户端连接 │
│ │ Client connects │
│ ▼ │
│ client_fd = accept(...) │
│ │ │
└────────│────────────────────────────────┘
│
sendmsg 传递 client_fd (通过 Unix socket)
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ 工作进程 │ │ 工作进程 │ │ 工作进程 │
│ │ │ │ │ │
│ recv_fd() │ │ │ │ │
│ 处理请求 │ │ │ │ │
│ │ │ │ │ │
└────────────┘ └────────────┘ └────────────┘
总结
- Pipe - 简单、单向、不能传递 fd
- Socketpair - 双向、可以使用 sendmsg/recvmsg
- sendmsg/recvmsg - 可以传递文件描述符 (通过 SCM_RIGHTS)
- 传递 fd - 只能通过 Unix domain socket,不能跨机器
更多推荐

所有评论(0)