0.简介
在前面文章中,我们对单个进程相关的内容进行了介绍,但进程不是独立存在的,在很多情况下都需要和其他进程进行交互,也就是进程间的通信,Linux中提供的多种IPC实现方式,本文将介绍IPC概念、原理、源码剖析、使用方式等方面来对进程通信进行介绍。
1.IPC概念
IPC(InterProcess Communication,进程通信)是操作系统的核心机制,其用于实现进程间信息交换和协同控制(协同控制其本质上也是进程间的信息交换)。通过这个我们能达到什么效果呢?
1)数据共享:可以在不同进程间传递结构化的消息。
2)资源协调:通过协同控制去避免竞争。
3)功能解耦:允许进程的分工协作,让不同进程去负责不同的任务。
了解了IPC和他可以做到的事情,我们来看其实现方式,我们知道每个进程的地址空间都是独立的,内核空间是共享的,所以基本上进程的通信都是通过内核实现的。
2. IPC实现方式解析
下面将对常见的几种IPC实现方式进行说明(管道,消息队列,共享内存,套接字,信号量,信号):
2.1 管道
管道分为两种,即匿名管道(只能在有亲缘关系的进程间使用,如父子进程)和命名管道(允许没有情缘关系的进程通信)。
2.1.1 匿名管道
匿名管道是内核中的一块环形区域,通过fork继承文件描述符来实现,其最开始创建管道时会产生两个fd,一个用于和内核的读交互,一个和写交互,fork时子进程会继承这两个fd,这样和父进程分别关闭一个,就实现了数据的单向传递。
在linux中,管道的实现是通过一个环状缓冲区实现的,其通过创建两个file结构,指向同一个pipe_inode_info,实现读写的交互。同时设置操作表将read,write操作指向专门的pipe_read和pipe_write函数。
struct pipe_inode_info {
struct mutex mutex; // 互斥锁,保护管道操作
wait_queue_head_t rd_wait, wr_wait; // 读写等待队列
unsigned int head; // 写指针位置
unsigned int tail; // 读指针位置
unsigned int max_usage; // 最大使用量
unsigned int ring_size; // 环形缓冲区大小
unsigned int nr_accounted; // 已计数的页面
unsigned int readers; // 读端引用计数
unsigned int writers; // 写端引用计数
unsigned int files; // 文件描述符引用计数
unsigned int r_counter; // 读计数器
unsigned int w_counter; // 写计数器
struct page *tmp_page; // 临时页面
struct fasync_struct *fasync_readers; // 异步读通知
struct fasync_struct *fasync_writers; // 异步写通知
struct pipe_buffer *bufs; // 管道缓冲区数组
struct user_struct *user; // 所属用户
unsigned int curbuf; // 当前缓冲区索引
};
使用例子如下:
#include <stdio.h>
#include <unistd.h>
int main() {
int fd[2];
pid_t pid;
char buffer[100];
// 创建管道
if (pipe(fd) < 0) {
perror("pipe error");
return 1;
}
// 创建子进程
if ((pid = fork()) < 0) {
perror("fork error");
return 1;
}
if (pid == 0) { // 子进程:写数据
close(fd[0]); // 关闭读端
write(fd[1], "Hello from child!", 17);
close(fd[1]);
} else { // 父进程:读数据
close(fd[1]); // 关闭写端
read(fd[0], buffer, sizeof(buffer));
printf("Parent received: %s\n", buffer);
close(fd[0]);
}
return 0;
}
2.1.2 命名管道
命名管道的创建原理和匿名管道类似,通过创建file结构和设置file类型去创建特殊的文件。和匿名管道主要区别在于匿名管道不会去创建磁盘的对应,只有临时的inode,而命名管道会关联到文件系统。
相关函数如下:
//创建命名管道,pathname是文件路径名,mode是权限模式(如0666)
int mkfifo(const char *pathname, mode_t mode);
//打开管道
int open(const char *pathname, int flags);
//打开后就可以使用相关read,write进行读写
2.2 消息队列
消息队列是保存在内核中的消息链表,其可以允许多个进程去向它读取/写入消息,其存有特定的格式和优先级。其相比于管道来说改进了管道只能承载无格式数据流以及缓冲区大小受限的缺点。
其消息链表主要结构如下:
/* one msq_queue structure for each present queue on the system */
struct msg_queue {
struct kern_ipc_perm q_perm;
time64_t q_stime; /* last msgsnd time */
time64_t q_rtime; /* last msgrcv time */
time64_t q_ctime; /* last change time */
unsigned long q_cbytes; /* current number of bytes on queue */
unsigned long q_qnum; /* number of messages in queue */
unsigned long q_qbytes; /* max number of bytes on queue */
struct pid *q_lspid; /* pid of last msgsnd */
struct pid *q_lrpid; /* last receive pid */
struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
} __randomize_layout;
其使用方式如下:
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
struct msg_buffer {
long msg_type;
char msg_text[100];
};
int main() {
key_t key;
int msgid;
struct msg_buffer message;
// 生成唯一键
key = ftok(".", 'a');
// 创建消息队列
msgid = msgget(key, 0666 | IPC_CREAT);
if (msgid == -1) {
perror("msgget error");
exit(1);
}
// 发送消息
message.msg_type = 1;
sprintf(message.msg_text, "Hello from message queue!");
if (msgsnd(msgid, &message, sizeof(message.msg_text), 0) == -1) {
perror("msgsnd error");
exit(1);
}
// 接收消息
if (msgrcv(msgid, &message, sizeof(message.msg_text), 1, 0) == -1) {
perror("msgrcv error");
exit(1);
}
printf("Received: %s\n", message.msg_text);
// 删除消息队列
msgctl(msgid, IPC_RMID, NULL);
return 0;
}
2.3 共享内存
消息队列放入是将数据放入内核链表,获取是从内核拷贝到用户空间,其存在系统调用以及拷贝开销,共享内存克服了这些问题。共享内存是通过将不同的进程虚拟地址映射同一个物理内存,这样他们就可以访问同一块实际空间。
共享内存的核心结构如下,分配的共享内存信息可以使用cat /proc/sysvipc/shm查看。
struct shmid_kernel {
struct kern_ipc_perm shm_perm; // 权限和基本信息结构
struct file *shm_file; // 关联的共享内存文件
unsigned long shm_nattch; // 当前附加计数
unsigned long shm_segsz; // 段大小(字节)
time64_t shm_atim; // 最后附加时间
time64_t shm_dtim; // 最后分离时间
time64_t shm_ctim; // 最后修改时间
struct pid *shm_cprid; // 创建者PID
struct pid *shm_lprid; // 最后操作PID
struct user_struct *mlock_user; // 内存锁定用户信息
/* The task created the shm object. NULL if the task is dead. */
struct task_struct *shm_creator;
struct list_head shm_clist; /* list by creator */
} __randomize_layout;
使用例子如下:
#include <stdio.h>
#include <stdlib.h>
#include <sys/shm.h>
int main() {
key_t key;
int shmid;
char *shm_addr;
// 生成唯一键
key = ftok(".", 'a');
// 创建共享内存段
shmid = shmget(key, 1024, 0666 | IPC_CREAT);
if (shmid == -1) {
perror("shmget error");
exit(1);
}
// 连接共享内存
shm_addr = (char*)shmat(shmid, NULL, 0);
if (shm_addr == (char *)-1) {
perror("shmat error");
exit(1);
}
// 写入数据
sprintf(shm_addr, "Hello from shared memory!");
// 分离共享内存
if (shmdt(shm_addr) == -1) {
perror("shmdt error");
exit(1);
}
// 再次连接共享内存读取数据
shm_addr = (char*)shmat(shmid, NULL, 0);
printf("Read from shared memory: %s\n", shm_addr);
shmdt(shm_addr);
// 删除共享内存段
shmctl(shmid, IPC_RMID, NULL);
return 0;
}
2.4 套接字
上面的通信方式进程需要在一台主机上,而套接字可以在多台主机上实现进程的通信,其简单来说就是使用socket来进行网络通信,这个涉及网络子系统,后面会单独分析网络子系统内容,简单例子如下:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
int main() {
int server_fd, new_socket;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char buffer[1024] = {0};
const char *hello = "Hello from server!";
// 创建套接字
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 设置套接字选项
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(8080);
// 绑定套接字
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听连接
if (listen(server_fd, 3) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
// 接受连接
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
}
// 读取客户端消息
read(new_socket, buffer, 1024);
printf("Client message: %s\n", buffer);
// 发送响应
send(new_socket, hello, strlen(hello), 0);
printf("Hello message sent\n");
// 关闭套接字
close(new_socket);
close(server_fd);
return 0;
}
2.5 信号量
上面方式主要都是用来做数据传输,接下来来看同步使用的方法,也就是信号量,信号量本质上是一个计数器,通过所分层来保护信号量或者保护整个sem_array。
struct sem_array {
struct kern_ipc_perm sem_perm; /* 权限控制结构 */
time64_t sem_ctime; /* 最后操作时间 */
struct list_head pending_alter; /* 改变信号量值的挂起操作 */
struct list_head pending_const; /* 不改变信号量值的复杂操作 */
struct list_head list_id; /* 该信号量集上的undo请求 */
int sem_nsems; /* 信号量数组中信号量数量 */
int complex_count; /* 挂起的复杂操作计数 */
unsigned int use_global_lock;/* 是否需要全局锁 */
struct sem sems[]; /* 柔性数组,实际信号量数组 */
} __randomize_layout;
其使用方式如下:
#include <stdio.h>
#include <stdlib.h>
#include <sys/sem.h>
// 联合体,用于信号量操作
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
int main() {
key_t key;
int semid;
union semun arg;
struct sembuf sb;
// 生成唯一键
key = ftok(".", 'a');
// 创建信号量集
semid = semget(key, 1, 0666 | IPC_CREAT);
if (semid == -1) {
perror("semget error");
exit(1);
}
// 初始化信号量值为1(可用)
arg.val = 1;
if (semctl(semid, 0, SETVAL, arg) == -1) {
perror("semctl error");
exit(1);
}
// P操作(获取资源)
sb.sem_num = 0;
sb.sem_op = -1; // 减1
sb.sem_flg = 0;
if (semop(semid, &sb, 1) == -1) {
perror("semop P error");
exit(1);
}
printf("Critical section is acquired.\n");
// V操作(释放资源)
sb.sem_op = 1; // 加1
if (semop(semid, &sb, 1) == -1) {
perror("semop V error");
exit(1);
}
printf("Critical section is released.\n");
// 删除信号量集
if (semctl(semid, 0, IPC_RMID) == -1) {
perror("semctl RMID error");
exit(1);
}
return 0;
}
2.6 信号
信号本质上就是一种软中断,可以实现事件的异步触发,其种类很多,后面单独进行介绍。