当前位置: 首页 > news >正文

Asio17-MultiThreadPool

Asio17-MultiThreadPool.md

区别

Asio-16也是一种多线程模式,那么他跟这一节有什么区别呢?

简单而言,上一节的结构是,n个线程,每个线程一个io_context在运行,相当于底层多个epoll.这一节呢,也是n个线程,但是只有1个线程有io_context在run,底层相当于一个epoll.

区别在于我们上一节的多线程,每一个线程在各自的io_context取出回调,交给上层的逻辑层去处理,每个线程只管自己的一部分,没有线程安全问题.

这一节的多线程呢,每一个线程都在一个io_context等待着取出回调,然后交给上层的逻辑层.但是多个线程同时在一个任务队列取回调,必然要引发线程安全问题的.那么怎么做呢?我们使用了asio提供的boost::asio::strand来实现串行的取出回调.

结构如下:

image-20250925201539437

首先实现ThreadPool:

// .h
#include "Singleton.hpp"#include <boost/asio.hpp>class ThreadPool : public Singleton<ThreadPool> {
public:friend class Singleton<ThreadPool>;~ThreadPool();boost::asio::io_context& GetContext();void Stop();private:ThreadPool(int num = std::thread::hardware_concurrency());boost::asio::io_context _io_context;std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> _work_guard;std::vector<std::thread> _threads;
};// .cpp
#include "ThreadPool.h"
#include <boost/asio/io_context.hpp>void ThreadPool::Stop()
{_work_guard.reset();_io_context.stop();for (auto& thread : _threads) {thread.join();}
}ThreadPool::ThreadPool(int num): _work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_io_context.get_executor()))
{for (std::size_t i = 0; i < num; ++i) {_threads.emplace_back([this]() {_io_context.run();});}
}boost::asio::io_context& ThreadPool::GetContext()
{return _io_context;
}ThreadPool::~ThreadPool()
{Stop();
}

同时CSession添加一个变量:

strand<io_context::executor_type> _strand;

其实流程还是有点难以理解,到底怎么多线程的?

我个人理解,每个CSession都会有一个独立的变量strand,而这个stand类似一个队列,我们看到异步读写的回调函数是这样调用的:

boost::asio::async_read(_socket, boost::asio::buffer(_recv_msg_node->_data, data_len), boost::asio::bind_executor(_strand, std::bind(&CSession::HandleMsg, this, std::placeholders::_1, std::placeholders::_2, SharedSelf())));

实际上当有回调函数的时候,这个任务就会进入这个strand队列之中,而由于所有的CSession里面的strand初始化的时候都绑定了同一个io_context,而这个io_context跑在多个线程之中,那么asio就会分配线程去处理strand中的回调.

实际类似这样:

// 创建3个session
Session session1(io), session2(io), session3(io);// 提交任务
session1.操作("A1");  // strand1队列: [A1]
session1.操作("A2");  // strand1队列: [A1] → [A2]session2.操作("B1");  // strand2队列: [B1]  
session2.操作("B2");  // strand2队列: [B1] → [B2]session3.操作("C1");  // strand3队列: [C1]
session3.操作("C2");  // strand3队列: [C1] → [C2]// 4个线程执行io_context.run(),可能的执行情况:
线程1: 取出strand1的A1执行 → 完成后取出strand1的A2执行
线程2: 取出strand2的B1执行 → 完成后取出strand2的B2执行  // 与线程1并行!
线程3: 取出strand3的C1执行 → 完成后取出strand3的C2执行  // 与其他并行!
线程4: 空闲或执行其他任务

也就是说,每个线程之内是串行的完成队列里的各个任务,而线程间是并行的!

根据测试呢,发现,ThreadServices版本比ThreadPool版本要快1秒左右(前者4s,后者5-6s,客户端100个线程,每个线程500个循环,收发包).

客户端代码

#include <boost/asio.hpp>
#include <chrono>
#include <iostream>
#include <json/json.h>
#include <json/reader.h>
#include <json/value.h>
#include <thread>
#include <vector>using namespace std;
using boost::asio::ip::tcp;constexpr int MAX_LENGTH = 1024 * 2;
constexpr int HEAD_TOTAL = 4; // 2字节id + 2字节len/*---------- 主函数 ----------*/
int main()
{auto start = chrono::high_resolution_clock::now();vector<thread> thr;for (int i = 0; i < 100; ++i) {thr.emplace_back([i] {try {boost::asio::io_context ioc;tcp::socket sock(ioc);sock.connect(tcp::endpoint { {}, 9999 });for (int seq = 0; seq < 500; ++seq) {/* 1. 构造 JSON 正文 */Json::Value root;root["id"] = 1001;root["msg"] = "hello world";string body = root.toStyledString();/* 2. 打包头部 + 正文 */uint16_t msg_id = 1001;uint16_t body_len = static_cast<uint16_t>(body.size());unsigned char out[HEAD_TOTAL + body.size()];memcpy(reinterpret_cast<char*>(out), &msg_id, 2);memcpy(reinterpret_cast<char*>(out) + 2, &body_len, 2);memcpy(reinterpret_cast<char*>(out) + 4, body.data(), body.size());/* 3. 一次性写出 */boost::asio::write(sock, boost::asio::buffer(reinterpret_cast<char*>(out), HEAD_TOTAL + body.size()));/* 4. 收头部 */unsigned char head[HEAD_TOTAL];boost::asio::read(sock, boost::asio::buffer(head, HEAD_TOTAL));uint16_t reply_id = 0, reply_len = 0;memcpy(&reply_id, head, 2);memcpy(&reply_len, head + 2, 2);if (reply_len == 0 || reply_len > MAX_LENGTH - 1) {cerr << "bad reply_len=" << reply_len << '\n';break;}/* 5. 收正文 */// vector<unsigned char> payload(reply_len);char payload[MAX_LENGTH] = { 0 };boost::asio::read(sock, boost::asio::buffer(payload, reply_len));/* 6. 解析 JSON */Json::Value reply;string payload_str(payload, reply_len);if (!Json::Reader {}.parse(payload_str, reply)) {cerr << "json parse fail\n";continue;}cout << "id=" << reply["id"]<< " msg=" << reply["msg"].asString()<< " thr=" << i<< " seq=" << seq << '\n';}} catch (exception& e) {cerr << "Exception: " << e.what() << '\n';}});this_thread::sleep_for(chrono::milliseconds(10));}for (auto& t : thr)t.join();auto dur = chrono::duration_cast<chrono::seconds>(chrono::high_resolution_clock::now() - start);cout << "Time spent: " << dur.count() << " s\n";return 0;
}

这里其实很不"现代",用了很多裸指针,裸数组.主要是为了兼容之前的c风格.如果cpp风格的话,可以使用vector完全替代裸数组.

http://www.jsqmd.com/news/135666/

相关文章:

  • 【大模型技术研究】SGLang入门指南:高效大模型推理与编程的利器(附实战代码)
  • 一个使用 WPF 开发的 Diagram 画板工具(包含流程图FlowChart,思维导图MindEditor)
  • 领导根本不关心你干了多少活,只在意这3点
  • 70
  • Asio12-HandlePacketStickingProblemSimply
  • 第三章 SQL Server函数
  • Chap22-DistributedLock_MultiServer
  • Asio09-SendQueueAndEndian
  • 第四章 SQL Server备份和还原
  • 5分钟使用modelengine打造儿童数字人,小白也能快速上手以低代码的方式快速搭建智能应用,从而大幅降低开发难度
  • 基于springboot在线课程管理系统的设计与实现毕业论文+PPT(附源代码+演示视频)
  • LLM - 用 SpecKit 和 AICode 改造遗留系统 完整实践指南
  • Elasticsearch数据膨胀?调优部署全攻略
  • 【计算机毕业设计案例】基于Java的停车场管理系统、预订车位系统、停车缴费(程序+文档+讲解+定制)
  • Router_路由的基本使用
  • 计算机Java毕设实战-基于Java的停车场管理系统【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 快速检查Ubuntu进程是否运行的3种方法
  • Java毕设项目:基于springboot的公司财务预算管理系统(源码+文档,讲解、调试运行,定制等)
  • 贝叶斯优化Transformer-LSTM的模型结构图
  • 番茄小说下载器 2025.12.21 | 现代化、高效的番茄小说下载器,支持批量下载和多种格式导出
  • 计算机Java毕设实战-基于SpringBoot的植物知识管理与分享平台的设计与实现家庭园艺种植分享平台设计与实现【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • Router_路由重定向和其他小细节问题
  • Java毕设项目:基于SpringBoot+Vue技术的医疗器械管理系统设计与实现(源码+文档,讲解、调试运行,定制等)
  • Redis 数据结构底层与 Hash 优于 JSON 的工程实践
  • STM32平衡车工具-匿名助手+虚拟串口如何使用。
  • 编码器测速思路,以及如何进行测速,速度调整
  • 从零开始学C++:STL简介
  • 【计算机毕业设计案例】基于springboot+vue技术的二手车交易管理系统的设计与实现(程序+文档+讲解+定制)
  • 别再“+”到天亮!String.format 一键拯救Java字符串拼接,高可读+可维护神操作
  • Router_编程式路由