当前位置: 首页 > news >正文

Linux线程同步与互斥(五):线程池的全面实现

线程池是一种“池化”技术(类似内存池、连接池),它预先创建一组线程,然后不断从任务队列中取出任务并执行。这样做的好处是:

  • 避免频繁创建/销毁线程的开销(线程创建和销毁涉及系统调用,成本较高)。

  • 控制并发线程数量,防止因线程过多导致内存耗尽或调度过载。

  • 任务解耦:生产者(提交任务的模块)和消费者(工作线程)通过队列通信,提高响应速度。


一、前置依赖模块

为了专注线程池逻辑,我们先回顾会用到的几个封装类:

1.Thread线程封装

#ifndef _THREAD_H_ #define _THREAD_H_ #include <iostream> #include <string> #include <pthread.h> #include <cstdio> #include <cstring> #include <functional> #include "Log.hpp" using namespace LogModule; namespace ThreadModlue { static uint32_t number = 1; // bug class Thread { using func_t = std::function<void()>; // 暂时这样写,完全够了 private: void EnableDetach() { _isdetach = true; } void EnableRunning() { _isrunning = true; } static void *Routine(void *args) // 属于类内的成员函数,默认包含this指针! { Thread *self = static_cast<Thread *>(args); self->EnableRunning(); if (self->_isdetach) self->Detach(); pthread_setname_np(self->_tid, self->_name.c_str()); self->_func(); // 回调处理 return nullptr; } // bug public: Thread(func_t func) : _tid(0), _isdetach(false), _isrunning(false), res(nullptr), _func(func) { _name = "thread-" + std::to_string(number++); } void Detach() { if (_isdetach) return; if (_isrunning) pthread_detach(_tid); EnableDetach(); } std::string Name() { return _name; } bool Start() { if (_isrunning) return false; int n = pthread_create(&_tid, nullptr, Routine, this); if (n != 0) { return false; } else { return true; } } bool Stop() { if (_isrunning) { int n = pthread_cancel(_tid); if (n != 0) { return false; } else { _isrunning = false; return true; } } return false; } void Join() { if (_isdetach) { return; } int n = pthread_join(_tid, &res); if (n != 0) { LOG(LogLevel::DEBUG) << "join线程失败"; } else { LOG(LogLevel::DEBUG) << "join线程成功"; } } ~Thread() { } private: pthread_t _tid; std::string _name; bool _isdetach; bool _isrunning; void *res; func_t _func; }; } #endif

关键点:线程的执行体是_funcStart中通过pthread_create调用一个静态函数,再调用_func

2. 互斥锁封装

#pragma once #include <iostream> #include <pthread.h> namespace MutexModule { class Mutex { public: Mutex() { pthread_mutex_init(&_mutex, nullptr); } void Lock() { int n = pthread_mutex_lock(&_mutex); (void)n; } void Unlock() { int n = pthread_mutex_unlock(&_mutex); (void)n; } ~Mutex() { pthread_mutex_destroy(&_mutex); } pthread_mutex_t *Get() { return &_mutex; } private: pthread_mutex_t _mutex; }; class LockGuard { public: LockGuard(Mutex &mutex) : _mutex(mutex) { _mutex.Lock(); }; ~LockGuard() { _mutex.Unlock(); }; private: Mutex &_mutex; }; }

3. 条件变量封装(Cond.hpp

#pragma once #include <iostream> #include <pthread.h> #include "Mutex.hpp" using namespace MutexModule; namespace CondModule { class Cond { public: Cond() { pthread_cond_init(&_cond, nullptr); } void Wait(Mutex &mutex) { int n = pthread_cond_wait(&_cond, mutex.Get()); (void)n; } void Signal() { // 唤醒在条件变量下等待的一个线程 int n = pthread_cond_signal(&_cond); (void)n; } void Broadcast() { // 唤醒所有在条件变量下等待的线程 int n = pthread_cond_broadcast(&_cond); (void)n; } ~Cond() { pthread_cond_destroy(&_cond); } private: pthread_cond_t _cond; }; };

4. 日志系统(Log.hpp

#ifndef __LOG_HPP__ #define __LOG_HPP__ #include <iostream> #include <filesystem> //c++17 #include <fstream> #include <string> #include <cstdio> #include <memory> #include <unistd.h> #include <ctime> #include <sstream> #include "Mutex.hpp" using namespace MutexModule; namespace LogModule { const std::string gsep = "\r\n"; // 策略模式 -- C++多态特性 // 2.刷新策略 a:显示器打印 b:向指定的文件写入 // 刷新策略基类 class LogStrategy { public: ~LogStrategy() = default; virtual void SyncLog(const std::string &message) = 0; private: }; // 显示器打印日志的策略:子类 class ConsoleLogStrategy : public LogStrategy { public: ConsoleLogStrategy() { } void SyncLog(const std::string &message) override { LockGuard lockguard(_mutex); std::cout << message << gsep; } ~ConsoleLogStrategy() { } private: Mutex _mutex; // 显示器也是临界资源,保证输出线程安全 }; // 文件打印日志的策略 : 子类 const std::string defaultPath = "./log"; const std::string defaultfile = "log.log"; class FileLogStrategy : public LogStrategy { public: FileLogStrategy(const std::string &path = defaultPath, const std::string &file = defaultfile) : _path(path), _file(file) { LockGuard lockguard(_mutex); if (std::filesystem::exists(_path)) { return; } try { std::filesystem::create_directories(_path); } catch (const std::filesystem::filesystem_error &e) { std::cerr << e.what() << "\n"; } } void SyncLog(const std::string &message) override { LockGuard lockguard(_mutex); std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file; //"./log/" + "my.log" std::ofstream out(filename, std::ios::app); // 以追加写入的方式打开 if (!out.is_open()) { return; } out << message << gsep; out.close(); } ~FileLogStrategy() { } private: std::string _path; // 日志文件所在的路径 std::string _file; // 日志文件本身 Mutex _mutex; }; // 形成一条完整的日志&&根据上面的策略,选择不同的刷新方式 // 1.形成日志等级 enum class LogLevel { DEBUG, INFO, WARNING, ERROR, FATAL }; std::string Level2Str(LogLevel level) { switch (level) { case LogLevel::DEBUG: return "DEBUG"; case LogLevel::INFO: return "INFO"; case LogLevel::WARNING: return "WARNING"; case LogLevel::ERROR: return "ERROR"; case LogLevel::FATAL: return "FATAL"; default: return "UNKNOWN"; } } std::string GetTimeStamp() { time_t curr = time(nullptr); struct tm curr_tm; localtime_r(&curr, &curr_tm); char timebuffer[128]; snprintf(timebuffer, sizeof(timebuffer), "%4d-%02d-%02d %02d:%02d:%02d", curr_tm.tm_year+1900, curr_tm.tm_mon+1, curr_tm.tm_mday, curr_tm.tm_hour, curr_tm.tm_min, curr_tm.tm_sec); return timebuffer; } // 1.形成日志 && 2.根据不同的策略,完成刷新 class Logger { public: Logger() { EnableConsoleLogStrategy(); } void EnableFileLogStrategy() { _fflush_strategy = std::make_unique<FileLogStrategy>(); } void EnableConsoleLogStrategy() { _fflush_strategy = std::make_unique<ConsoleLogStrategy>(); } // 内部类 表示的是未来的一条日志 class LogMessage { public: LogMessage(LogLevel &level, std::string &src_name, int line_number, Logger &logger) : _curr_time(GetTimeStamp()), _level(level), _pid(getpid()), _src_name(src_name), _line_number(line_number), _logger(logger) { // 日志左边部分,合并起来 std::stringstream ss; ss << "[" << _curr_time << "]" << "[" << Level2Str(_level) << "]" << "[" << _pid << "]" << "[" << _src_name << "]" << "[" << _line_number << "]" << "- "; _loginfo = ss.str(); }; // LogMessage() << "hello world" << "xxxx" << 3.14 << 1234; // 需要支持重载 template <typename T> LogMessage &operator<<(const T &info) { // 日志右边部分,可变的 std::stringstream ss; ss << info; _loginfo += ss.str(); return *this; } ~LogMessage() { if (_logger._fflush_strategy) { _logger._fflush_strategy->SyncLog(_loginfo); } }; private: std::string _curr_time; LogLevel _level; pid_t _pid; std::string _src_name; int _line_number; std::string _loginfo; // 合并完成之后,一条完整的信息 Logger &_logger; }; // 这里故意写成返回临时对象 LogMessage operator()(LogLevel level, std::string name, int line) { return LogMessage(level, name, line, *this); } ~Logger() {} private: std::unique_ptr<LogStrategy> _fflush_strategy; }; // 全局日志对象 Logger logger; // 使用宏,简化用户操作,获取文件名和行号 #define LOG(level) logger(level, __FILE__, __LINE__) #define Enable_Console_Log_Stratege() logger.EnableConsoleLogStrategy(); #define Enable_File_Log_Stratege() logger.EnableFileLogStrategy(); } #endif

5.Makefile

threadpool:Main.cc g++ -o $@ $^ -std=c++17 -lpthread .PHONY:clean clean: rm -f threadpool

二、线程池

2.1 ThreadPool.hpp

#pragma once #include <iostream> #include <string> #include "Log.hpp" #include <vector> #include <queue> #include "Cond.hpp" #include " Thread.hpp" namespace ThreadPoolModule { using namespace ThreadModlue; using namespace LogModule; using namespace CondModule; using namespace MutexModule; static const int gnum = 4; template <typename T> class ThreadPool { private: void WakeUpAllThread() { LockGuard localguard(_mutex); if (_sleepernum) _cond.Broadcast(); LOG(LogLevel::INFO) << "唤醒所有的休眠的线程"; } void WakeUpOne() { _cond.Signal(); LOG(LogLevel::INFO) << "唤醒一个的休眠的线程"; } public: ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0) { for (int i = 0; i < num; i++) { _threads.emplace_back( [this]() { HandlerTask(); }); } } void Start() { if (_isrunning) return; // 如果线程已经启动了,返回 _isrunning = true; for (auto &thread : _threads) { thread.Start(); LOG(LogLevel::INFO) << "create new thread success: " << thread.Name(); } } void Stop() { if (!_isrunning) return; _isrunning = false; // 唤醒所有线程 WakeUpAllThread(); } void Join() { for (auto &thread : _threads) { thread.Join(); } } void HandlerTask() { char name[128]; pthread_getname_np(pthread_self(), name, sizeof(name)); while (true) { T t; { LockGuard lockguard(_mutex); // 1.a.队列是否为空 b.线程池没有退出 while (_taskq.empty() && _isrunning) { _sleepernum++; _cond.Wait(_mutex); _sleepernum--; } // 2.内部的线程被唤醒 if (!_isrunning && _taskq.empty()) { LOG(LogLevel::INFO) << name << "退出了,线程池退出&&任务队列为空"; break; } // 一定有任务 t = _taskq.front(); // 从q中获取任务,任务已经是线程私有的了 _taskq.pop(); } t(); // 处理任务,需要在临界区内部处理吗? } } bool Enqueue(const T &in) { if (_isrunning) { LockGuard lockguard(_mutex); _taskq.push(in); if (_threads.size() - _sleepernum == 0) WakeUpOne(); return true; } return false; } ~ThreadPool() {}; private: std::vector<Thread> _threads; int _num; // 线程池中,线程的个数 std::queue<T> _taskq; Cond _cond; Mutex _mutex; bool _isrunning; int _sleepernum; }; }

2.2 Task.hpp

#pragma once #include <iostream> #include <unistd.h> #include <functional> #include "Log.hpp" using task_t = std::function<void()>; using namespace LogModule; void Download() { LOG(LogLevel::DEBUG) << "我是一个下载任务..."; }

2.3 Main.cc

#include "Log.hpp" #include "ThreadPool.hpp" #include <memory> #include "Task.hpp" using namespace LogModule; using namespace ThreadPoolModule; int main() { Enable_Console_Log_Stratege(); ThreadPool<task_t> *tp = new ThreadPool<task_t>(); tp->Start(); int count = 10; while(count) { tp->Enqueue(Download); sleep(1); count--; } tp->Stop(); tp->Join(); }

2.4 常见问题解析

1. 模板参数T

ThreadPool是一个模板类,T代表任务类型。通常我们使用std::function<void()>作为任务类型,这样任何可调用对象(函数指针、lambda、std::bind结果等)都可以作为任务包装。

2. 工作线程的主循环HandlerTask

void HandlerTask() { char name[128]; pthread_getname_np(pthread_self(), name, sizeof(name)); while (true) { T t; { LockGuard lockguard(_mutex); // 1.a.队列是否为空 b.线程池没有退出 while (_taskq.empty() && _isrunning) { _sleepernum++; _cond.Wait(_mutex); _sleepernum--; } // 2.内部的线程被唤醒 if (!_isrunning && _taskq.empty()) { LOG(LogLevel::INFO) << name << "退出了,线程池退出&&任务队列为空"; break; } // 一定有任务 t = _taskq.front(); // 从q中获取任务,任务已经是线程私有的了 _taskq.pop(); } t(); // 处理任务,需要在临界区内部处理吗? } }

线程池中工作线程的核心循环它的作用是:不断从任务队列中取出任务并执行,如果队列为空就休眠等待,当线程池要退出时能正确终止。

为什么要用while而不是if

  • 条件变量可能被伪唤醒(spurious wakeup),或者被NotifyAll唤醒时队列仍然为空(比如另一个线程抢先取走了任务)。

  • while循环确保被唤醒后重新检查条件,只有队列非空或线程池停止时才退出循环。

_sleepernum的作用

  • 记录当前有多少线程因为队列空而处于休眠等待状态。

  • Enqueue时,如果_sleepernum > 0,可以只唤醒一个线程(_cond.Notify())来取任务,避免唤醒所有线程造成“惊群效应”。这是一种性能优化。

_cond.Wait(_mutex)的原子操作

  • 进入Wait后,会自动释放_mutex锁,然后当前线程挂起。

  • 当被NotifyNotifyAll唤醒后,会重新竞争_mutex锁,获得锁后才从Wait返回。

  • 因此,从Wait返回时,线程已经再次持有锁,可以安全地访问共享的_taskq

3. 检查退出条件

if (!_isrunning && _taskq.empty()) { LOG(LogLevel::INFO) << name << "退出了,线程池退出&&任务队列为空"; break; }

4. 执行任务(无锁)

t(); // 处理任务,在临界区外执行

为什么要在临界区外执行任务?

  • 任务执行可能耗时很长(比如处理复杂计算或 I/O)。如果在锁内执行,其他工作线程无法访问任务队列,导致并发度下降。

  • 先取出任务并释放锁,让其他线程可以同时取任务或入队,提高吞吐量。

5. 完整循环的三种退出路径

情况条件行为
正常处理任务队列非空取任务,执行,继续循环
队列空且线程池运行中_taskq.empty() && _isrunning进入Wait休眠,直到被唤醒
线程池停止且队列空!_isrunning && _taskq.empty()break退出循环,线程结束
http://www.jsqmd.com/news/704668/

相关文章:

  • 如何用Umi-OCR告别截图文字手打?离线OCR的5个效率倍增技巧
  • 比较能源系统优化调度的深度强化学习算法:DDPG、TD3、SAC和PPO的性能与可行性
  • 多模态传感器自动校准技术解析与应用实践
  • 深入浅出 Kubernetes 网络【20260426-003篇】
  • 5分钟掌握EB Garamond 12:免费商用复古字体终极指南
  • 【OpenClaw养虾】从零开始部署安装,接入机器人
  • 使用 Operator 框架管理有状态应用
  • 3步搞定Windows风扇控制:FanControl让你的电脑散热更智能
  • Boot Camp驱动自动化革命:Brigadier如何将45分钟部署压缩至5分钟
  • 2026年3月商标购买网站哪里有,购买注册商标/商标注册购买/闲置商标转让/注册商标转让,商标购买渠道哪家靠谱 - 品牌推荐师
  • 如何用Umi-OCR快速提取截图文字:从新手到高手的完整指南
  • AI代码执行沙箱从POC到生产环境的生死7步(附Gartner评估矩阵与内部审计检查表)
  • 如何一次性解决所有Visual C++运行库问题:终极修复指南
  • 如何高效修复损坏视频:Untrunc完整实用指南
  • 网页隐性载荷滥用,催生 AI 助手全新攻击范式
  • Qt之状态机 - scrutiny
  • 留一交叉验证(LOOCV)原理与scikit-learn实战指南
  • 软件服务中的客户成功体系建设
  • 国产芯片适配进度告急!MCP 2026强制认证倒计时180天,你还在用X86测试环境凑合?
  • HPH的构造是怎样的 核心部件全解析
  • PathOfBuilding实战指南:3大核心功能助你高效构建流放之路角色
  • 如何彻底解决macOS滚动方向混乱问题:Scroll Reverser完整配置指南
  • STM32智能门锁避坑指南:RFID读卡器选型、FLASH存储异常与舵机供电那些事儿
  • NI-DAQmx计数器频率测量全攻略:从低频到高频,三种方法怎么选不踩坑?
  • LLaMA-Factory数据集格式详解与高质量数据构建方法-方案选型对比
  • [具身智能-464]:语音识别与语音合成的关键和核心是模型文件,分别阐述它们的输入和输出
  • RimWorld终极免费模组管理器:3步解决模组冲突,轻松管理200+模组 [特殊字符]
  • flutter
  • 联想小新电脑关闭键盘灯
  • 一个功能完整的在线单词搜索游戏网站:主题丰富 + 多语言 + 自定义题目 + 可分享可打印