【Linux】进程间通信(1)管道进程池实现

张开发
2026/4/6 20:09:38 15 分钟阅读

分享文章

【Linux】进程间通信(1)管道进程池实现
目录一 系统及语言变化二 进程间通信简介1 是什么----本质前提2 为什么要通信3 怎么进行通信三 管道1 什么是管道2 匿名管道原理3 demo代码1创建管道(2)创建子进程3形成单向通信的管道4 管道通信场景1场景12场景23场景34场景4四 实践进程池实现1 任务模块2 子进程工作模块3 Channel 类管道 子进程封装4 进程池核心ProcessPool1 Start () —— 创建 N 个子进程 N 条管道2PushTask () —— 派发任务负载均衡3)Stop () —— 关闭进程池5 main 函数流程控制6 进程池完整代码一 系统及语言变化从这节博客开始我们就把语言切换为C系统由centos7 切换为ubentu 24.04/20.04 vscode远程开发vim被vscode所替代vim虽然写代码的效率高但是不一定开发效率高vscode是一款文件编辑器它是一个轻量化插件式的软件vscode打开文件夹是默认打开你的电脑里的特定目录安装Remote - SSH插件Remote - SSH是 VS Code 的官方插件核心作用是通过 SSH 协议连接远程服务器 / 虚拟机在本地 VS Code 里直接编辑、运行、调试远程代码vscode和xhsell协调步骤1搜索插件Remote-SSH插件安装vscode中2出现远端资源管理器点击后出现加号点击输入 ssh IP地址之后选择一个配置文件右下角就会出下降已添加主机3点击刷新就会出现一台主机左上角点击这一排右边的任意一个按键后输入主机密码就链接成功4在xshell中mkdir xxx创建一个新目录之后在VS中打开文件夹选择对应路径当前打开就在家目录下之后会再让输入一次密码写入代码用crtls保存这样协调操作就完成了vscode中光标在哪一行crtlccrtlv 这一行就会直接复制粘贴crtl~波浪号在vscode中出现命令行终端在这一行下面就可以输入Linux目录就相当于一台小型的机器远端xshell不推荐用vscode做本地和远端的调试推荐使用cgbd好了现在我们可以进入进程间通信的学习了二 进程间通信简介1 是什么----本质前提进程间通信指的就是两个或多个进程进行互相传递信息的过程进程具有独立性--进程内核数据结构代码和数据要规避进程之间的耦合关系所以至少在目前一个进程把自己的数据发送给另一个进程也是一件比较困难的事但是父进程的全局变量不是能交给子进程吗但是这不是通信是父给子的且只能给全局变量所以只能叫做继承数据无法做到持续性通信2 为什么要通信进程间通信目的数据传输一个进程需要将它的数据发送给另一个进程资源共享多个进程之间共享同样的资源。通知事件一个进程需要向另一个或一组进程发送消息通知它它们发生了某种事件如进程终止时要通知父进程。进程控制有些进程希望完全控制另一个进程的执行如 Debug 进程此时控制进程希望能够拦截另一个进程的所有陷入和异常并能够及时知道它的状态改变。3 怎么进行通信假如在操作系统有两个进程A BB不能看到A申请的空间A也不能看B的那么怎么进行通信呢创建一个进程交换的公共场所本质是内存由操作系统提供进程间通信的本质前提让不同的进程看到同一份资源资源由操作系统提供我们后续进行进程间通信时大部分时间都是想办法让进程看到同一份资源利用系统调用操作系统必然提供系统调用为通信接口制定标准我们用到的是System V复用文件部分代码最小的通信代价管道常见的通信方式管道匿名管道命名管道三 管道1 什么是管道管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”结论1父子进程为什么会向同一个显示器打印因为指向了同一个文件父进程写好一份文件之后再创建子进程子进程就能共享这份文件一个向里面写入一个向外读取结论2以文件形式继承给子进程这种通信方案叫做管道说明管道本质是文件进程不能用open打开磁盘文件需要系统调用管道在设计之初只允许进行单向数据通信父进程关闭不需要文件描述符管道特点1基于文件的单向数据通信---为了简单化设计不需要考虑数据朝向问题为什么父进程需要董事打开读写为了让子进程继承读写打开的方式2 匿名管道原理#include unistd.h 功能:创建⼀匿名管道 原型 int pipe(int fd[2]); 参数 fd⽂件描述符数组,其中fd[0]表⽰读端, fd[1]表⽰写端 返回值:成功返回0失败返回错误代码这两个文件描述符fd[0]和fd[1]会被填入当前进程的 files_struct 文件描述符表中占下标34的位置指向内核中同一个管道文件对象。每个文件对象都有独立的文件偏移量 pos读端的 pos 记录读取位置写端的 pos 记录写入位置它们共享同一个内核缓冲区和 inode数据会先写入缓冲区再由读端从缓冲区读取匿名管道的本质是操作系统内核在内存中创建的一个临时文件它具备以下特点没有磁盘路径、没有文件名因此也被称为匿名管道不占用磁盘空间数据仅存放在内核的缓冲区中拥有独立的 inode 节点但该节点只存在于内存中不持久化到磁盘3 demo代码我们来写一段代码需求是子进程进行写入父进程读取父子进程传递可变字符串注意C文件后缀.cc .cpp .cxx1创建管道管道头文件#includeunsitd.h输出型数组// pipefd[0] 读端 // pipefd[1] 写端 int pipefd[2] {0}; pipe(pipefd);(2)创建子进程管道特点2管道只能用来让具有血缘关系的进程进行进程间通信常用于父子进程之间进行进程间通信pid_t id fork();3形成单向通信的管道子进程想写保留1关闭0closed(pipefd[0]);父进程想读保留0关闭1closed(pipefd[1]);管道特点3管道的本质是文件一般文件如果打开它的进程推出了那么文件也会被系统自动的关闭打开文件的生命周期随进程父进程关闭子进程不管--管道还在维持有相关文件没有结束子进程写数据函数WriteData父进程读数据函数ReadDataif(id 0) { // 子进程只写关闭读端 close(pipefd[0]); WriteData(pipefd[1]); close(pipefd[1]); exit(0); } else { // 父进程只读关闭写端 close(pipefd[1]); ReadData(pipefd[0]); close(pipefd[0]); }void WriteData(int wfd) { int cnt 1; pid_t id getpid(); while(true) { sleep(1); // 每秒写一次 // 构造消息 std::string message hello father...; message ...; // 向管道写 write(wfd, message.c_str(), message.size()); } }void ReadData(int rfd) { char inbuffer[1024]; while(true) { sleep(5); // 每5秒读一次 // 从管道读 ssize_t n read(rfd, inbuffer, sizeof(inbuffer)-1); if(n 0) { inbuffer[n] \0; std::cout 读到 inbuffer std::endl; } } }管道特点4管道自己内部实现了进程间同步我们整理一下上面的特点1 管道在设计之初只允许进行单项数据通信管道特点之一基于文件的单向数据通信2 管道只能用来让具有血缘关系的进程进行进程间通信常用于父子进程之间进行进程间通信3 管道的本质是文件一般文件如果打开它的进程退出了那么文件也会被系统自动关闭打开的文件的生命周期随进程4管道自己内部是实现了进程间的同步5 管道是面向字节流的4 管道通信场景1场景1写端很慢读端很快以慢的节奏来----父进程等待数据进入即等待子进程进入管道是面向字节流的最朴素的认识--读写次数不匹配读端可以按照自己的需求读读写次数匹配场景发取快递别人给你发了几个快递你就要取几个快递读写不匹配场景一次买了100吨水但是每次的使用量不同可能是10吨也可能是几百毫升2场景2写端很快读端很慢读端会把写端写入的数据能一次读上了就全部读上来极端化场景写端特别快读端不读--管道会被写慢写满后写端怎么办写进程会被阻塞我们这个代码的验证发现写到65536就不写了---65536/102464所以管道会被写满最多写64KB读端不读写端一直写道把管道阻塞3场景3写端不写关闭写端closefwd)读端会怎么样读端把管道内容全部读完会读到0表示end of file读到文件结尾read系统调用read返回0读到文件结尾场景3是不通信正常退出的场景4场景4写端一直写读端不读了且关闭读端close(rwd)操作系统不会做任何浪费时间和空间的事写端一直写毫无意义操作系统会通过信号杀掉写进程13号信号SIGPIPE什么时候用到管道四 实践进程池实现用进程池比再去修改shell更有价值池化技术减少系统调用的次数提高效率调用系统调用成本高池提高效率 进程池预先创建子进程有任务先处理任务不需要有任务时再创建子进程进程池要先创建一批子进程父进程想向哪个子进程写入就向哪个子进程写入控制写入任务码父进程发给子进程的 “指令编号”用来告诉子进程要执行什么任务。不是固定的系统概念是自己定义的一个数字父进程可以通过给任意一个子进程发送任务码的情况写入代码负载均衡如果一直让一个子进程完成任务其他子进程不安排任务就会造成忙闲不均把任务码均匀的撒到每一个子进程让子进程以相同的压力工作提高系统的稳定性---做法可以有随机数轮询计数器....代码整体结构1. 任务模块定义4种任务 任务码 2. 子进程工作逻辑从管道读任务码执行对应任务 3. Channel类管理【子进程PID 父进程写端fd】 4. 进程池类创建N个子进程 管道实现负载均衡 5. main函数加载任务 → 启动进程池 → 推送任务 → 关闭进程池1 任务模块// 任务类型用数字代表要做什么事 —— 这就是【任务码】 #define LOG_TASK 0 // 打印日志 #define DOWNLOAD_TASK 1 // 下载 #define MYSQL_TASK 2 // 访问数据库 #define REDIS_TASK 3 // 访问redis // 所有任务放到一个数组里 std::vectortask_t gtasks; // 加载任务把函数放进任务列表 void LoadTask() { gtasks.push_back(printlog); // 下标0 gtasks.push_back(download); // 下标1 gtasks.push_back(readmysql); // 下标2 gtasks.push_back(writeredis); // 下标3 }任务码 数字编号0、1、2、3 分别代表一种任务子进程收到数字就执行对应函数2 子进程工作模块void Work(int rfd) { while (true) { int code 0; // 子进程阻塞读等待父进程发任务码 ssize_t n read(rfd, code, sizeof(code)); if (n sizeof(int)) { // 执行任务码对应的任务 gtasks[code](); } else if (n 0) { break; // 父进程关闭写端 → 子进程退出 } } }子进程循环等待父进程发任务读到任务码 code直接调用 gtasks[code]() 执行任务父进程关闭管道 → 子进程自动退出3 Channel 类管道 子进程封装class Channel { public: Channel(int wfd, pid_t who) : _wfd(wfd), // 父进程写端fd _sub_process_id(who) // 子进程pid {} // 父进程通过这里发送【任务码】 void SendTask(int taskcode) { write(_wfd, taskcode, sizeof(taskcode)); } };4 进程池核心ProcessPool1 Start () —— 创建 N 个子进程 N 条管道void Start() { for (int i 0; i _number; i) { // 1. 创建管道 int pipefd[2]; pipe(pipefd); // 2. 创建子进程 pid_t id fork(); if(id 0) // 子进程 { close(pipefd[1]); // 子进程关写端 Work(pipefd[0]); // 子进程进入工作循环 close(pipefd[0]); exit(0); } else // 父进程 { close(pipefd[0]); // 父进程关读端 _channels.emplace_back(pipefd[1], id); } } }创建 N 个子进程每个子进程绑定一条独立管道父进程只保留写端子进程只保留读端2PushTask () —— 派发任务负载均衡void PushTask(int taskcode) { int who Next(); // 轮询选一个子进程 _channels[who].SendTask(taskcode); // 发任务码 }轮询调度依次给每个子进程发任务发送的内容就是任务码一个 int 数字3)Stop () —— 关闭进程池void Stop() { for(auto channel: _channels) { channel.Close(); // 关闭写端 → 子进程读到0退出 channel.Wait(); // 回收子进程 } }父进程关闭所有管道写端子进程 read 返回 0 → 自动退出循环父进程 wait 回收防止僵尸进程5 main 函数流程控制int main() { // 1. 加载任务列表 LoadTask(); // 2. 创建进程池例如5个子进程 std::unique_ptrProcessPool pp make_uniqueProcessPool(5); // 3. 启动进程池创建进程管道 pp-Start(); // 4. 推送任务任务码 pp-PushTask(0); pp-PushTask(1); pp-PushTask(2); pp-PushTask(3); // 5. 关闭进程池 pp-Stop(); }6 进程池完整代码#include iostream #include string #include vector #include memory #include functional #include ctime #include cstdlib #include unistd.h #include sys/wait.h #define __MAIN__ ///////////////////////任务测试代码///////////////////////// using task_t std::functionvoid(); void printlog() { // sleep(1); std::cout 我是一个打印日志的任务, pid: getpid() std::endl; } void download() { // sleep(1); std::cout 我是一个下载任务, pid: getpid() std::endl; } void readmysql() { // sleep(1); std::cout 我是一个访问数据库的任务, pid: getpid() std::endl; } void writeredis() { // sleep(1); std::cout 我是一个访问redis的任务, pid: getpid() std::endl; } std::vectortask_t gtasks; void LoadTask() { gtasks.push_back(printlog); gtasks.push_back(download); gtasks.push_back(readmysql); gtasks.push_back(writeredis); } // *: 输出型参数 // const : 输入型参数 // : 输入输出型 void RandomTask(std::vectorint *out) { for (int i 0; i 50; i) { int code rand() % gtasks.size(); usleep(23223); out-push_back(code); } } #define LOG_TASK 0 #define DOWNLOAD_TASK 1 #define MYSQL_TASK 2 #define REDIS_TASK 3 std::string Task2String(int code) { switch (code) { case LOG_TASK: return printlog; case DOWNLOAD_TASK: return download; case MYSQL_TASK: return readmysql; case REDIS_TASK: return writeredis; default: return unknown; } } ///////////////////////进程池代码///////////////////////// void Work(int rfd) { while (true) { int code 0; ssize_t n read(rfd, code, sizeof(code)); if (n sizeof(int)) { if (code 0 code gtasks.size()) { gtasks[code](); } } else if (n 0) { break; // 子进程只要读到返回值为0, 表明父进程让我退出 } else { break; } } } class Channel { public: Channel(int wfd, pid_t who) : _wfd(wfd), _sub_process_id(who) { _name Channel- std::to_string(_sub_process_id) - std::to_string(_wfd); } int Fd() { return _wfd; } pid_t SubId() { return _sub_process_id; } std::string Name() { return _name; } void Close() { if (_wfd 0) close(_wfd); } void Wait() { pid_t rid waitpid(_sub_process_id, nullptr, 0); (void)rid; } void SendTask(int taskcode) { ssize_t n write(_wfd, taskcode, sizeof(taskcode)); (void)n; } ~Channel() { } private: int _wfd; pid_t _sub_process_id; std::string _name; }; class ProcessPool { private: int Next() { int choice _next_choice; _next_choice; _next_choice % _channels.size(); return choice; } public: ProcessPool(int number) : _number(number), _next_choice(0) { std::cout number: _number std::endl; } // 父进程 void Start() { for (int i 0; i _number; i) { // 1. 创建管道 int pipefd[2]; int n pipe(pipefd); if (n 0) { perror(pipe); exit(2); } // 2. 创建子进程 pid_t id fork(); if (id 0) { perror(fork); exit(3); } else if (id 0) // 子进程 { // 关闭父进程历史的wfd for(auto channel : _channels) channel.Close(); close(pipefd[1]); Work(pipefd[0]); close(pipefd[0]); exit(0); } else // 父进程 { close(pipefd[0]); // pipefd[1]; _channels.emplace_back(pipefd[1], id); } } } // 1. 什么任务任务码决定 // 2. 任务给谁属于进程池内部操作负载均衡 void PushTask(int taskcode) { // 选择一个子进程 int who Next(); _channels[who].SendTask(taskcode); std::cout 发送任务: Task2String(taskcode) [ taskcode ] 给: _channels[who].Name() std::endl; } void Stop() { // version 2 ??? for(auto channel: _channels) { channel.Close(); channel.Wait(); std::cout channel.Name() close and wait success! std::endl; } // version3 // int end _channels.size() - 1; // while(end 0) // { // _channels[end].Close(); // _channels[end].Wait(); // std::cout _channels[end].Name() closea and wait success! std::endl; // end--; // } // 内部bug // 1. 关闭wfd -- version1 // for (auto channel : _channels) // { // channel.Close(); // std::cout channel.Name() close success! std::endl; // } // // sleep(3); // // // 2. 回收子进程 // for (auto channel : _channels) // { // channel.Wait(); // std::cout channel.Name() wait success! std::endl; // } } void DebugPrint() { std::cout ------------------------------------- std::endl; for (auto channel : _channels) { std::cout channel.Fd() std::endl; std::cout channel.SubId() std::endl; std::cout channel.Name() std::endl; } std::cout ------------------------------------- std::endl; } ~ProcessPool() {} private: std::vectorChannel _channels; int _number; int _next_choice; }; // 父进程 #ifdef __MAIN__ static void Usage(const std::string proc) { std::cout Usage:\n\t proc process_number std::endl; } // ./process_pool 5 int main(int argc, char *argv[]) { if (argc ! 2) { Usage(argv[0]); exit(1); } int number std::stoi(argv[1]); // 0. 加载任务 srand(time(nullptr) ^ getpid()); LoadTask(); std::vectorint task_codes; RandomTask(task_codes); // 1. 创建进程池对象 std::unique_ptrProcessPool pp std::make_uniqueProcessPool(number); // 2. 启动进程池 pp-Start(); sleep(2); // for (auto task : task_codes) // { // pp-PushTask(task); // usleep(500000); // } // while(true) // { // // int code 0; // // std::cout Please Enter Your Task# ; // // std::cin code; // // if(code 0 || code gtasks.size()) // // { // // std::cout 任务码错误, 请重新输入 std::endl; // // continue; // // } // pp-PushTask(code); // } pp-Stop(); return 0; } #endif

更多文章