当前位置: 首页 > news >正文

C++ 服务端进阶(四)—— 多 Reactor + 协程:真正的高并发模型(融合版)

一、这一篇到底在做什么?

到目前为止,你已经分别完成了:

  • ✔ 第一篇:Connection(结构抽象)
  • ✔ 第二篇:多 Reactor(并发模型)
  • ✔ 第三篇:单 Reactor + 协程(执行模型)

但这三者是分开的能力

结构 ✔ 并发 ✔ 执行 ✔

👉 真正的高并发服务端,还差最后一步:

把并发模型(多 Reactor)和执行模型(协程)融合

本篇目标

构建这样一套模型:

Main Reactor(accept) ↓ Sub Reactor(线程) + 协程(执行)

一句话总结本篇:

多线程负责“并发”,协程负责“执行”

二、最终架构图(必须理解)

Main Reactor(主线程) ↓ accept ┌──────────────┬──────────────┬──────────────┐ ↓ ↓ ↓ Sub Reactor1 Sub Reactor2 Sub Reactor3 (线程1) (线程2) (线程3) ↓ ↓ ↓ coroutine coroutine coroutine

三、和第三篇的本质区别


第三篇(单线程)

一个 Reactor
+ 多协程

👉 特点:

  • ✔ 无锁
  • ❌ 只能用一个 CPU

第四篇(本篇)

多 Reactor(多线程)
+ 每个 Reactor 内部用协程

👉 特点:

  • ✔ 多核利用
  • ✔ 无锁(连接不跨线程)
  • ✔ 高并发

四、核心设计原则(非常重要)

原则 1:连接只属于一个 Reactor

fd → 固定 Reactor

👉 不跨线程 → 不需要锁


原则 2:每个线程独立事件循环

线程1 → epoll1
线程2 → epoll2


原则 3:协程只在所属 Reactor 线程执行

👉 不跨线程恢复


原则 4:Main Reactor 只做 accept

👉 不处理业务

五、项目结构(最终版)

. ├── Task.h ├── Reactor.h / Reactor.cpp ├── ReactorThread.h / ReactorThread.cpp ├── ReactorThreadPool.h / ReactorThreadPool.cpp ├── SocketUtil.h / SocketUtil.cpp ├── Server.h / Server.cpp └── main.cpp

六、核心代码实现

1️⃣ ReactorThread(线程 + Reactor)

ReactorThread.h

#ifndef REACTOR_THREAD_H #define REACTOR_THREAD_H #include "Reactor.h" #include <thread> #include <memory> class ReactorThread { public: ReactorThread(); ~ReactorThread(); Reactor* getReactor(); void start(); private: std::unique_ptr<Reactor> reactor_; std::thread thread_; }; #endif

ReactorThread.cpp

#include "ReactorThread.h" ReactorThread::ReactorThread() { reactor_ = std::make_unique<Reactor>(); } ReactorThread::~ReactorThread() { if (thread_.joinable()) { thread_.join(); } } Reactor* ReactorThread::getReactor() { return reactor_.get(); } void ReactorThread::start() { thread_ = std::thread([this]() { reactor_->loop(); }); }

2️⃣ ReactorThreadPool

ReactorThreadPool.h

#ifndef REACTOR_THREAD_POOL_H #define REACTOR_THREAD_POOL_H #include "ReactorThread.h" #include <vector> #include <memory> class ReactorThreadPool { public: explicit ReactorThreadPool(int size); void start(); Reactor* getNextReactor(); private: std::vector<std::unique_ptr<ReactorThread>> threads_; int index_; }; #endif

ReactorThreadPool.cpp

#include "ReactorThreadPool.h" ReactorThreadPool::ReactorThreadPool(int size) : index_(0) { for (int i = 0; i < size; ++i) { threads_.emplace_back(std::make_unique<ReactorThread>()); } } void ReactorThreadPool::start() { for (auto& t : threads_) { t->start(); } } Reactor* ReactorThreadPool::getNextReactor() { Reactor* reactor = threads_[index_]->getReactor(); index_ = (index_ + 1) % threads_.size(); return reactor; }

3️⃣ Server(协程版本)

Server.h

#ifndef SERVER_H #define SERVER_H #include "Task.h" class Reactor; DetachedTask handleConnection(int fd, Reactor& reactor); #endif

Server.cpp

#include "Server.h" #include "Reactor.h" #include <unistd.h> #include <errno.h> #include <iostream> #include <string> DetachedTask handleConnection(int fd, Reactor& reactor) { std::string writeBuffer; char buffer[1024]; while (true) { co_await reactor.readable(fd); while (true) { ssize_t n = read(fd, buffer, sizeof(buffer)); if (n > 0) { writeBuffer.append(buffer, n); } else if (n == 0) { reactor.removeFd(fd); close(fd); co_return; } else { if (errno == EAGAIN) break; reactor.removeFd(fd); close(fd); co_return; } } while (!writeBuffer.empty()) { co_await reactor.writable(fd); ssize_t wn = write(fd, writeBuffer.data(), writeBuffer.size()); if (wn > 0) { writeBuffer.erase(0, wn); } else if (errno != EAGAIN) { reactor.removeFd(fd); close(fd); co_return; } } } }

4️⃣ 主线程 accept 分发

main.cpp

#include "Reactor.h" #include "ReactorThreadPool.h" #include "SocketUtil.h" #include "Server.h" #include <sys/socket.h> #include <iostream> int main() { int listenFd = createListenFd(8080); ReactorThreadPool pool(4); pool.start(); Reactor mainReactor; std::cout << "server running :8080" << std::endl; while (true) { int clientFd = accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK); if (clientFd >= 0) { Reactor* subReactor = pool.getNextReactor(); handleConnection(clientFd, *subReactor); } } return 0; }

七、最核心执行流程(必须理解)

客户端连接 ↓ Main Reactor accept ↓ 选择 Sub Reactor ↓ 启动协程 handleConnection ↓ co_await readable → 挂起 ↓ epoll_wait ↓ fd ready ↓ Reactor 恢复协程 ↓ 继续执行 read/write

八、这一篇真正的价值

你现在掌握的是:

epoll(事件) Reactor(调度) 多线程(并发) 协程(执行)

👉 组合起来就是:

现代高并发服务端模型

九、总结

如果说前三篇是在分别建立结构、并发和执行模型,那么本篇完成的,是这三者的真正融合。

在这个模型中:

  • 多 Reactor 提供并发能力
  • Reactor 提供事件调度能力
  • 协程提供执行模型

三者结合,构成了现代高性能服务端的核心运行方式。

十、下一篇(第五篇)

👉 《Connection + 协程:面向对象的异步模型》

👉Connection + 协程(工程级写法)

你会把:Connection + coroutine

彻底融合成:

class Connection { coroutine run(); };

单 Reactor + 协程,是“执行模型”
多 Reactor + 协程,才是“架构模型”

http://www.jsqmd.com/news/595865/

相关文章:

  • Qwen3-14B部署实战:从零配置到API批量调用的完整链路
  • mmdetection训练VisDrone数据集避坑指南:从数据准备到模型调优全流程
  • 优化element-ui中select下拉框popper在滚动场景下的显示问题
  • Nanbeige4.1-3B实战教程:用600步工具链实现复杂任务自动分解执行
  • CefFlashBrowser:让Flash内容在现代系统中延续生命的技术方案
  • 雷达工程师的视角:线性调频脉冲压缩在实际雷达系统中的作用与参数权衡
  • seo 站群的发展趋势如何
  • Rust并发编程安全实践:从理论到实战
  • VMware管理员必备:VCSA 6.7证书全生命周期管理实战
  • DownKyi完全指南:5个简单步骤让你轻松下载B站高清视频
  • AIGlasses_for_navigation数据管道:Python爬虫获取实时路况数据并注入模型
  • 文脉定序系统开发环境配置:从系统重装到一键部署的完整流程
  • Qwen-Image-2512-ComfyUI入门指南:从安装到生成第一张海报
  • 如何让卡顿电脑重获新生?揭秘WindowsCleaner的5大突破
  • Qwen3.5-2B镜像定制教程:修改System Prompt+更换UI主题+添加快捷指令
  • CUDA内存管理全指南:从锁页内存到托管内存的四种策略详解
  • OpenClaw技能开发入门:为百川2-13B-4bits量化模型定制PDF阅读器
  • Pixel Couplet Gen效果展示:多轮交互式春联优化——用户反馈→LLM重生成→像素重渲染
  • 弦音墨影惊艳效果:‘墨迹’笔刷交互式修正bounding box的主动学习演示
  • 【脑电分析系列】第17篇:EEG 非线性特征在神经疾病诊断中的实战应用 — 从熵到赫斯特指数的综合评估
  • Windows Cleaner:彻底解决C盘爆红问题的免费系统清理工具
  • 2026年高性价比电子防潮箱厂家推荐 - 品牌排行榜
  • Rust与C/C++互操作指南:从理论到实战
  • Qwen3.5-9B模型微调:优化OpenClaw的邮件回复质量
  • GME多模态向量模型功能体验:上传图片输入文字,体验Any2Any搜索魅力
  • 《从同步到消息驱动:现代后端交互模式的深度解析与工程实践》
  • 初学者如何自学SEO优化
  • Nunchaku-flux-1-dev时序预测可视化:结合LSTM生成数据趋势图
  • Rust crate开发与发布指南:从创建到发布
  • 2026大型餐饮隔油设备供应商推荐 - 品牌排行榜