从零实现一个进程池(基于管道通信)
前言
进程池是什么?简单说就是预先创建一批子进程,让它们等着,有任务来了就分配一个去执行。这样就不用每次有任务都现fork,省了创建进程的开销。
一、整体设计思路
我们要实现的东西:
父进程创建 N 个子进程
每个子进程和父进程之间用管道连接
父进程通过管道给子进程发“任务码”
子进程收到任务码后执行对应的任务函数
父进程可以控制子进程做什么,也可以通知它们退出
核心数据结构就两个:
Channel:表示一条通信管道(包含写端 fd、名字、子进程 pid) ProcessPool:管理所有 Channel 的进程池二、任务定义(Task.hpp)
#pragma once // 防止头文件重复包含 #include <iostream> #include <string> #include <vector> #include <functional> // 定义任务类型:一个可调用对象,无参数无返回值 using task_t = std::function<void()>; // 任务1:模拟下载功能 void Download() { std::cout << "我是一个下载任务,进程ID: " << getpid() << std::endl; // 这里可以放真正的下载代码 } // 任务2:模拟数据库操作 void MySql() { std::cout << "我是一个 MySQL 任务,进程ID: " << getpid() << std::endl; // 这里可以放真正的数据库操作 } // 任务3:模拟数据同步 void Sync() { std::cout << "我是一个数据刷新同步的任务,进程ID: " << getpid() << std::endl; } // 任务4:模拟日志保存 void Log() { std::cout << "我是一个日志保存任务,进程ID: " << getpid() << std::endl; } // 全局任务表,存放所有可执行的任务 std::vector<task_t> tasks; /** * 初始化类:利用构造函数在 main 之前执行的特性 * 程序启动时自动将任务添加到 tasks 表中 */ class Init { public: Init() { tasks.push_back(Download); tasks.push_back(MySql); tasks.push_back(Sync); tasks.push_back(Log); std::cout << "任务表初始化完成,共有 " << tasks.size() << " 个任务" << std::endl; } }; // 定义一个全局的 Init 对象,main 函数执行前会自动调用构造函数 Init ginit;关键点说明:
using task_t = std::function<void()>:定义了一个类型别名,表示任何可调用的、无参数无返回值的对象(普通函数、lambda 等都可以)全局
tasks向量:存放所有任务,子进程通过下标索引来调用Init ginit:全局对象,在 main 函数执行之前就会构造,自动填充任务表
三、管道封装(Channel.hpp)
每个管道需要记录三样东西:
父进程用来写的文件描述符
这个管道的名字(方便调试和打印)
管道对面是哪个子进程(pid)
#ifndef __CHANNEL_HPP__ #define __CHANNEL_HPP__ #include <iostream> #include <string> #include <unistd.h> #include <sys/wait.h> /** * Channel 类:封装一条管道 * 每个 Channel 对应一个子进程,父进程通过它向子进程发送任务码 */ class Channel { public: // 默认构造函数 Channel() {} /** * 构造函数 * @param fd 父进程写端的文件描述符 * @param name 管道名称(用于调试) * @param id 目标子进程的 pid */ Channel(int fd, const std::string &name, pid_t id) : _wfd(fd), _name(name), _sub_target(id) {} /** * 调试打印函数,显示管道信息 */ void DebugPrint() { printf("channel name: %s, wfd: %d, target pid: %d\n", _name.c_str(), _wfd, _sub_target); } // 获取写端文件描述符 int Fd() { return _wfd; } // 获取管道名称 std::string Name() { return _name; } // 获取目标子进程 pid pid_t Target() { return _sub_target; } /** * 关闭写端文件描述符 * 当父进程关闭所有写端时,子进程的 read 会返回 0 * 子进程据此判断自己应该退出 */ void Close() { close(_wfd); } /** * 等待对应子进程结束 * 回收僵尸进程,防止资源泄漏 */ void Wait() { pid_t rid = waitpid(_sub_target, nullptr, 0); (void)rid; // 防止编译器警告未使用变量 } private: int _wfd; // 父进程写端文件描述符 std::string _name; // 管道名称,如 "channel-0" pid_t _sub_target; // 目标子进程的进程 ID }; #endif设计思路:
_wfd是父进程用来写数据的 fd(子进程那边是对应的读端)Close()关闭写端,这是通知子进程退出的关键:当所有写端关闭,子进程的read会返回 0Wait()等待对应子进程结束,这是父进程退出前必须做的,否则会留下僵尸进程
四、进程池核心(ProcessPool.hpp)
这是最复杂的部分,我们拆成几个小功能来看
#ifndef __PROCESS_POOL_HPP__ #define __PROCESS_POOL_HPP__ #include <iostream> #include <cstdlib> #include <string> #include <vector> #include <functional> #include <unistd.h> #include <sys/types.h> #include <sys/wait.h> #include <ctime> #include "Task.hpp" // 默认创建 5 个子进程 const int gdefault_process_num = 5; // 定义回调函数类型:子进程收到 fd 后要执行的函数 using callback_t = std::function<void(int fd)>; /** * 进程池类 * 管理多个子进程,通过管道向它们分发任务 */ class ProcessPool { private: /** * 辅助函数:控制一个子进程执行任务 * @param index 引用参数,轮询索引 * * 这个函数做三件事: * 1. 选择一个子进程(轮询方式) * 2. 随机选择一个任务 * 3. 通过管道把任务码发给子进程 */ void CtrlSubProcessHelper(int &index) { // 1. 选择一个通道(对应一个子进程) int who = index; // 当前要控制的子进程索引 index++; // 索引自增,下次选下一个 index %= _channels.size(); // 超出则回到0,实现轮询 // 2. 随机选择一个任务 int x = rand() % tasks.size(); // 3. 打印调试信息,看看选了哪个子进程、哪个任务 std::cout << "选择信道: " << _channels[who].Name() << ", subtarget : " << _channels[who].Target() << ", 任务码: " << x << std::endl; // 4. 通过管道把任务码发给子进程 // 注意:这里写的是 int 类型,正好 4 个字节 write(_channels[who].Fd(), &x, sizeof(x)); } public: /** * 构造函数 * @param num 要创建的子进程数量 */ ProcessPool(int num = gdefault_process_num) : _processnum(num) { // 初始化随机种子 // 用时间、当前进程 pid 和一个常量异或,增加随机性 srand(time(nullptr) ^ getpid() ^ 0x777); } ~ProcessPool() {} /** * 初始化进程池:创建子进程和管道 * @param cb 子进程创建后要执行的回调函数 * @return true 成功,false 失败 * * 这个函数是核心,做了几件事: * 1. 循环创建指定数量的子进程 * 2. 每个子进程创建前先创建管道 * 3. fork 后父子进程分别关闭不需要的 fd * 4. 子进程执行回调函数(通常是一个循环等待任务) * 5. 父进程保存管道信息到 _channels */ bool InitProcessPool(callback_t cb) { for (int i = 0; i < _processnum; i++) { sleep(1); // 稍微间隔一下,方便观察创建过程 // 1. 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) { std::cerr << "创建管道失败" << std::endl; return false; } // 2. 创建子进程 pid_t id = fork(); if (id < 0) { std::cerr << "fork 失败" << std::endl; return false; } if (id == 0) // 子进程 { // 重要:子进程要关闭从父进程继承下来的所有写端 // 如果不关,会导致管道引用计数不为0,父进程关闭写端时 // 子进程的 read 不会返回 0 for(auto &c : _channels) { c.Close(); // 关闭之前创建的所有管道的写端 } // 子进程只需要读端,关闭写端 close(pipefd[1]); // 子进程执行回调函数 // 这里会进入 Main.cc 里定义的 lambda 循环 cb(pipefd[0]); // 正常情况下不会执行到这里,因为 cb 里是死循环 // 当管道读端返回 0 时,循环退出,子进程 exit exit(0); } // 父进程 // 父进程只需要写端,关闭读端 close(pipefd[0]); // 给管道起个名字,方便调试 std::string name = "channel-" + std::to_string(i); // 将管道信息保存到 _channels 中 // emplace_back 直接在容器末尾构造 Channel 对象,避免拷贝 _channels.emplace_back(pipefd[1], name, id); } return true; } /** * 打印所有管道的写端文件描述符 * 用于调试,查看父进程打开了哪些 fd */ void ProcessPoolPrintFd() { std::cout << "进程池 wfd 列表: "; for (auto &c : _channels) std::cout << c.Fd() << " "; std::cout << std::endl; } /** * 轮询控制子进程(无限循环) * 主要用于测试,会不停地给子进程发任务 */ void PollingCtrlSubProcess() { int index = 0; while (true) { CtrlSubProcessHelper(index); sleep(1); // 每秒发一个任务 } } /** * 轮询控制子进程(有限次数) * @param count 要发送的任务次数 */ void PollingCtrlSubProcess(int count) { if (count < 0) return; int index = 0; while (count) { CtrlSubProcessHelper(index); count--; sleep(1); // 每秒发一个任务 } } /** * 等待所有子进程结束 * 步骤: * 1. 先关闭所有写端,通知子进程退出 * 2. 再等待每个子进程结束,回收僵尸进程 */ void WaitSubProcesses() { // 1. 先关闭所有写端 // 子进程的 read 会因此返回 0,跳出循环并 exit for (auto &c : _channels) { c.Close(); } // 2. 再回收所有子进程 for (auto &c : _channels) { c.Wait(); std::cout << "回收子进程: " << c.Target() << std::endl; } } private: std::vector<Channel> _channels; // 保存所有信道(每个子进程一个) int _processnum; // 子进程数量 }; #endif关键点说明:
子进程为什么要关闭之前的所有写端?
子进程通过
fork继承了父进程的所有文件描述符如果不关闭之前创建的管道写端,这些管道的引用计数就不会降为 0
当父进程关闭自己的写端时,因为子进程还开着,管道实际上还没关
这会导致其他子进程的
read无法返回 0,无法正常退出
为什么用
emplace_back而不是push_back?emplace_back直接在容器内存中构造对象,省去一次拷贝性能更好,代码也更简洁
轮询调度
index每次自增,然后取模,实现简单的轮询负载均衡每个子进程被选中的机会均等
五、主程序(Main.cc)
最后是主程序,把上面所有的东西串起来
#include "ProcessPool.hpp" /** * 主函数 * 流程: * 1. 创建进程池对象 * 2. 初始化进程池(创建子进程) * 3. 打印调试信息 * 4. 控制子进程执行任务 * 5. 等待所有子进程结束 */ int main() { // 1. 创建进程池对象,指定创建 5 个子进程 ProcessPool pp(5); // 2. 初始化进程池 // 传入一个 lambda 表达式,这是每个子进程要执行的代码 pp.InitProcessPool([](int fd){ while(true) { int code = 0; // 子进程阻塞在这里,等待父进程发任务码 ssize_t n = read(fd, &code, sizeof(code)); if(n == sizeof(code)) // 成功读到任务码 { std::cout << "子进程被唤醒: " << getpid() << ", fd: " << fd << std::endl; // 检查任务码是否合法 if(code >= 0 && code < tasks.size()) { // 执行对应的任务函数 tasks[code](); } else { std::cerr << "父进程给我的任务码不对: " << code << std::endl; } } else if(n == 0) { // 读返回 0,说明父进程关闭了写端 // 子进程应该退出了 std::cout << "子进程退出: " << getpid() << std::endl; break; } else { // 读出错 std::cerr << "read fd: " << fd << ", 错误" << std::endl; break; } } }); // 3. 打印所有管道的写端 fd,方便调试 pp.ProcessPoolPrintFd(); // 4. 控制子进程执行 10 次任务 std::cout << "开始控制子进程执行任务..." << std::endl; pp.PollingCtrlSubProcess(10); // 5. 等待所有子进程结束 std::cout << "任务完成,通知子进程退出..." << std::endl; pp.WaitSubProcesses(); std::cout << "父进程控制子进程完成,父进程结束" << std::endl; return 0; }关键点说明:
lambda 表达式的作用
这个 lambda 是每个子进程的主循环
它接收一个
fd参数,这个 fd 是子进程从管道读数据的读端子进程在
read处阻塞,等待父进程发任务收到任务码后执行对应任务,然后继续循环
三种返回值情况的处理
n == sizeof(code):正常读到任务码n == 0:父进程关闭了写端,子进程应该退出n < 0:读出错,子进程也应该退出
为什么 tasks 是全局的
tasks 定义在 Task.hpp 中,是全局变量
子进程通过继承能看到这个全局变量
这样就不用通过管道传递复杂的函数对象,只需要传一个 int 下标
六、编译和运行
Makefile
# Makefile 编译进程池程序 process_pool: Main.cc g++ -o $@ $^ -std=c++11 .PHONY:clean clean: rm -f process_pool运行效果
$ make g++ -o process_pool Main.cc -std=c++11 $ ./process_pool 任务表初始化完成,共有 4 个任务 进程池 wfd 列表: 4 6 8 10 12 开始控制子进程执行任务... 选择信道: channel-0, subtarget : 12345, 任务码: 2 子进程被唤醒: 12345, fd: 3 我是一个数据刷新同步的任务,进程ID: 12345 选择信道: channel-1, subtarget : 12346, 任务码: 0 子进程被唤醒: 12346, fd: 3 我是一个下载任务,进程ID: 12346 ... 任务完成,通知子进程退出... 子进程退出: 12345 子进程退出: 12346 ... 回收子进程: 12345 回收子进程: 12346 ... 父进程控制子进程完成,父进程结束七、总结
这个进程池的实现展示了几个重要概念:
管道通信:父子进程通过匿名管道传递整数任务码
进程池思想:预先创建子进程,避免频繁 fork
负载均衡:轮询方式选择子进程,分配任务
生命周期管理:父进程关闭写端通知子进程退出,然后回收僵尸进程
回调函数:用 lambda 表达式让子进程执行不同的逻辑
