当前位置: 首页 > news >正文

从零开始手写一个协程库(三)

引言

本篇文章是对于调度器(scheduler)的代码实现(代码也是参考了sylar的服务器框架),如果大家有兴趣,可以看一下我的前两篇文章

从零开始手写一个协程库(一)-CSDN博客

从零开始手写一个协程库(二)-CSDN博客

对于调度器的一个基本介绍

首先调度器并不是调度协程,调度器是最高级的,也是只在主线程里面存在的,而调度协程是在每个线程里面存在的,调度协程的任务是帮助本线程处理任务的调度,而调度器则是往这些调度协程里面塞任务的装置。

我们这里是多线程对应多协程,这样可以更好的提高我们的利用率。然后我们也准备了两种模式,一种是主线程参与工作;第二种是主线程不参与工作,主线程只负责操控调度器来分发任务。

所以名义上叫调度器,实际上就是管理一堆的线程,然后在线程里面放一个调度协程,再加上自身的一个任务队列,这就是调度器的本质

具体的细节我们代码里面说~~~

代码实现

这个是我们大类的一个基本架构。首先作为调度器,我们里面是由一个线程池和一个任务队列组成的。然后我们提供了一个scheduleLock()函数,这个函数的目的就是往任务队列里面添加任务。这个任务可以是协程可以是函数,因为一个函数我们最后也会封装成一个协程。但是为了方便和准确,我们还是用一个类模板的形式。

首先我们需要做一个标记,来判断我们需不需要唤醒线程,因为如果之前的队列是空的,那么所有的线程都处于idle模式,我们需要唤醒,如果不是空的,说明已经有线程在工作了,不需要唤醒。

然后我们的idle()函数的目的是让每个线程空闲的协程函数不休息,执行这么一个函数,这样子就不需要来回的切换状态,效率可以大大地提高。

然后就是我们地任务结构体,这个结构体里面封装地几个函数都是为了更加方便地处理这个任务地形式,不仅可以接受对象,还可以接受对象指针,不仅可以是协程,还可以是函数,不过大家要注意一个点,就是无论我们调用哪一个函数,我们都必须要指定一个线程号,这个的目的就是指定我们需要执行这个任务的线程,其他线程不可以执行这一个任务

最后我们用了m_useCaller变量,目的就是标记我们的主线程是不是参加工作

class Scheduler { public: // threads指定线程池的的数量,use_caller指定是否将主线程作为工作线程,name调度器的名称 Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = "Scheduler"); virtual ~Scheduler(); // 防止资源出现泄漏,基类指针删除派生类对象的时候不完全销毁对象的问题 const std::string& getName() const {return m_name;} public: // 获取正在运行的调度器 static Scheduler* GetThis(); protected: // 设置正在运行的调度器 void SetThis(); public: // 添加到任务队列里面 // FiberOrCb 调度任务类型,可以是协程对象或函数指针 /* 这个函数的逻辑是:因为调用了这个函数,说明有任务进来了,然后判断任务队列是不是空的 如果是空的说明线程们都睡觉在,需要唤醒线程 如果不是空的说明有线程在运行,不需要唤醒线程 */ template <typename FiberOrCb> void scheduleLock(FiberOrCb fc, int thread = -1) { bool need_tickle; // 用于标记任务队列是否为空,从而判断是否需要唤醒线程 { std::lock_guard<std::mutex> lock(m_mutex); need_tickle = m_tasks.empty(); // 创建Task的任务对象 ScheduleTask task(fc, thread); if (task.fiber || task.func) { // 任务的协程或者函数存在一个就可以了 m_tasks.push_back(task); } } if (need_tickle) { // 如果任务队列为空,需要唤醒线程 tickle(); } } virtual void start(); // 启动线程池,启动调度器 virtual void stop(); // 停止线程池,停止调度器,等待所有调度任务完成之后返回 protected: virtual void tickle(); // 唤醒线程 // 线程函数 virtual void run(); // 空闲协程函数,无任务调度的时候执行idle协程 virtual void idle(); // 是否可以关闭调度器,判断是否有线程在运行,是否有任务在列里面 virtual bool stopping(); // 返回是否有空闲的协程 // 当调度协程进入idle时空闲的线程+1,当调度协程返回空闲时,线程数-1 virtual bool hasIdleThread() {return m_idleThreadCount > 0;} private: // 任务 struct ScheduleTask { std::shared_ptr<Fiber> fiber; std::function<void()> func; int thread; // 指定任务需要运行的线程(我们这里是多线程多协程) ScheduleTask() { fiber = nullptr; func = nullptr; thread = -1; } /* 如果外部想保留任务的所有权,就传值:SchedulerTask(fiber, 0) 如果外部想把任务“扔”给调度器,就传指针:SchedulerTask(&fiber, 0) (调度器内部通过 swap 直接接管,零开销!) */ ScheduleTask(std::shared_ptr<Fiber> f, int thr) { fiber = f; thread = thr; } ScheduleTask(std::function<void()> f, int thr) { func = f; thread = thr; } ScheduleTask(std::shared_ptr<Fiber>* f, int thr) { fiber.swap(*f); // 交换指针,避免引用计数增加 thread = thr; } ScheduleTask(std::function<void()>* f, int thr) { func.swap(*f); // 交换指针,避免引用计数增加 thread = thr; } // 重置 void reset() { fiber = nullptr; func = nullptr; thread = -1; } }; private: std::string m_name; std::mutex m_mutex; std::vector<std::shared_ptr<Thread>> m_threads; // 线程池 std::vector<ScheduleTask> m_tasks; // 任务队列 std::vector<int> m_threadIds; // 线程id列表 size_t m_threadCount = 0; // 需要额外创建的线程数量 std::atomic<size_t> m_activeThreadCount{0}; // 正在运行的线程数量 std::atomic<size_t> m_idleThreadCount{0}; // 空闲线程数量 bool m_useCaller; // 是否将主线程作为工作线程 std::shared_ptr<Fiber> m_schedulerFiber; // 如果是->需要额外创建调度协程 int m_rootThread = -1; // 如果是->记录主线程的线程id bool m_stopping = false; // 是否停止调度器运行 };

这个是构造函数和析构函数

对于构造函数来说,它必须要确定主线程的情况,我们首先要设置调度器的位置在主线程里面,用SetThis(),然后如果主线程参加工作,首先我们要初始化环境,同时创建主协程,然后创建子协程(这个协程是调度协程),注意一下我们创建子协程的时候绑定了一个run函数,这个函数就是调度函数,还有一个点,我们传递了false这个参数,目的是因为这个就是调度协程,自然不会受到调度,所以是false。然后就是基本的赋值操作

// 在主线程里面创建了主协程和调度协程 Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name) : m_useCaller(use_caller), m_name(name) { // 首先判断线程的数量是否大于0,并且调度器的对象是否是空指针,然后调用SetThis()进行设置 assert(threads > 0 && Scheduler::GetThis() == nullptr); SetThis(); Thread::SetName(m_name); // 设置当前线程的名称为调度器的名称 // 使用主线程当作工作线程,创建协程的主要原因是为了实现更加高效的任务调度和管理 if (use_caller) { // 表示当前线程会被当作工作线程 threads--; // 减少线程的数量,因为主线程会被当作工作线程 // 创建主协程 Fiber::GetThis(); // 创建专门运行run()方法的调度协程,这个调度协程负责在当前线程中执行任务和分发循环 m_schedulerFiber.reset(new Fiber(std::bind(&Scheduler::run, this)), 0, false); // 这里是false意味着协程退出之后将返回主协程(相当于任务调度结束,返回到主协程) Fiber::SetSchedulerFiber(m_schedulerFiber.get()); // 设置协程的调度对象 m_rootThread = Thread::GetThreadId(); // 获取当前线程的ID m_threadIds.push_back(m_rootThread); } m_threadCount = threads; // 将剩余的线程数量赋值给m_threadCount } Scheduler::~Scheduler() { assert(stopping() == true); if (GetThis() == this) { t_scheduler = nullptr; } if (debug) { std::cout << "Scheduler::~Scheduler()" << std::endl; } }

然后我想补充一个易错的知识点

这两行代码看起来差不多,其实区别很大。第一个是指针,代表的是每一个线程都可以通过这个指针来访问调度器。第二个是对象,代表的是每一个线程都有一个属于他们自己的主协程。

static thread_local Scheduler* t_scheduler = nullptr; static thread_local std::shared_ptr<Fiber> t_thread_fiber = nullptr;

对于线程池的启动函数start(),我唯一想说的就是每次塞进线程池里面的线程一定是执行run函数。大家可以把run函数理解成一个协程,也就是调度协程,我们每一次初始化一个线程肯定是要先给它一个调度的协程,再插入任务的。

// 判断这个函数是否可以退出 bool Scheduler::stopping() { std::lock_guard<std::mutex> lock(m_mutex); return m_stopping && m_tasks.empty() && m_activeThreadCount == 0; } void Scheduler::start() { std::lock_guard<std::mutex> lock(m_mutex); assert(m_stopping == false); // 调度器还未开启 assert(m_threads.empty()); // 线程池还未开启 m_threads.resize(m_threadCount); for (size_t i = 0; i < m_threadCount; i++) { m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + "_" + std::to_string(i))); m_threadIds.push_back(m_threads[i]->getId()); } if (debug) { std::cout << "Scheduler::start()" << std::endl; } }

接下来就是核心run():也就是每个调度协程要干的事情。首先要设置一下调度器的位置,因为调度器只存在于主线程,所以对于子线程,如果要找到调度器,就必须先设置一个调度器指针副本(thread_local),然后因为我们主线程里面的调度协程已经初始化了环境,但是子线程里面的调度协程才刚刚创建,所以也要初始化。然后就到了关键的地方:

首先我们先创建一个执行idle的指针,还有一个任务对象,用来接受要处理的任务。然后就是while()循环,这个循环如果没有调度器通知退出,我们是不会主动退出的。

在循环的一开始,我们要做的就是取任务,这个任务有两个标准,一个是对应这个线程的id,一个是这个任务没有被执行。同时我们需要设置一个tickle_me变量,这个变量为true的条件就是遍历的过程中发现了任务,但是这个任务要么是不属于这个线程要么是已经被其他线程正在执行(因为我们假设是单核CPU,所以这个线程在执行的时候其他线程相当于在睡觉),或者是如果第一个就拿到了任务,但是发现队伍里面还有任务,因为这个任务不是end()的前面一个,那么就标记为true,随后通知一下其他线程,你们有任务了,别睡了。

然后是执行任务,如果是函数,就封装成协程(用智能指针),然后就是resume(),最后结束记得reset()这个任务。但是我们没有这个任务呢?首先主协程也就是调度协程会调用idle的resume(),然后在idle里又会yeild(),这个时候执行权又回到了主协程也就是调度协程这里,一直循环,直到有任务到来

// 调度器的核心,负责从任务队列中取出任务并通过协程执行 void Scheduler::run() { int thread_id = Thread::GetThreadId(); // 获取当前线程的ID if (debug) { std::cout << "Scheduler::run() start" << thread_id << std::endl; } SetThis(); // 设置调度器对象 // 如果当前线程不是主线程,那么就创建一个协程,用于执行任务和分发循环 if (thread_id != m_rootThread) { Fiber::GetThis(); } std::shared_ptr<Fiber> idle_fiber = std::make_shared<Fiber>(std::bind(&Scheduler::idle, this)); ScheduleTask task; while(true) { task.reset(); bool tickle_me = false; // 是否唤醒了其他线程进行任务调度 { std::lock_guard<std::mutex> lock(m_mutex); auto it = m_tasks.begin(); // 遍历任务队列 while(it != m_tasks.end()) { if (it->thread != -1 && it->thread != thread_id) { // 我们只执行没有被分配到的线程,并且任务分配时传递的id号要求和我们的线程一摸一样 tickle_me = true; it++; continue; } assert(it->fiber || it->func); task = *it; m_tasks.erase(it); m_activeThreadCount++; break; // 这里取到了任务就没有必要遍历到尾 } tickle_me = tickle_me || (it != m_tasks.end()); // 仍然存在未处理的任务 } // 具体的代码在io + scheduler if (tickle_me) { tickle(); } // 执行任务 if (task.fiber) { { // resume协程,resume返回时要么此时任务结束了,要么半路yeild了,总之任务完成了,活跃线程-1 std::lock_guard<std::mutex> lock(m_mutex); if (task.fiber->getState() != Fiber::TERM) { task.fiber->resume(); } } m_activeThreadCount--; // 线程执行完任务之后就不再处于活跃状态 task.reset(); } else if (task.func) { // 封装成协程加入调度 std::shared_ptr<Fiber> fiber = std::make_shared<Fiber>(task.func, 0, true); { std::lock_guard<std::mutex> lock(m_mutex); fiber->resume(); } m_activeThreadCount--; // 线程执行完任务之后就不再处于活跃状态 task.reset(); } else { // 无任务,执行idle协程 // 系统关闭-》idle协程将从死循环中出来结束-》此时的idle协程状态为TERM-》再次进入将跳出循环并退出run() if (idle_fiber->getState() == Fiber::TERM) { /* 如果调度器没有调度任务,那么idle协程不断地resume和yield,不会结束进入一个忙等待,如果idle协程结束 一定是调度器停止了,直到有任务才会执行上面的if/else,在这里idle_fiber就是不断和主协程交互的子协程 */ if (debug) { std::cout << "Scheduler::run() ends in thread:" << thread_id << std::endl; } break; } m_idleThreadCount++; idle_fiber->resume(); m_idleThreadCount--; } } }

这个也就是idle的代码

void Scheduler::idle() { while(!stopping()) { if (debug) { std::cout << "Scheduler::idle(), sleeping in thread:" << Thread::GetThreadId() << std::endl; sleep(1); Fiber::GetThis()->yield(); } } }

最后我们要让调度器优雅的终止:有几行十分优雅的代码,std::vector<std::shared_ptr<Thread>> thrs;从这一行开始我们的回收工作,因为我们无法保证在回收的时候没有任务进来,但是回收的时候一定需要加锁,防止数据的竞争,这就导致如果我们直接for()加锁然后join,可能如果有任务需要长时间执行,我们线程又开始工作,然后join一直阻塞,导致这个锁一直释放不了。这个锁释放不了,那些任务也可能执行不了,然后就死锁了。为了避免这个状态,我们在加锁的一瞬间swap一下,把当前的所有线程全部转移到另一个容器里,这个时候我们的join就不需要加锁了,因为没有人会修改thr容器,所以我们完全可以等待所有的线程执行完任务再回收,不会占着锁不释放。

void Scheduler::stop() { if (debug) { std::cout << "Scheduler::stop() in thread :" << Thread::GetThreadId() << std::endl; } if (stopping()) { return; } m_stopping = true; if (m_useCaller) { assert(GetThis() == this); } else{ assert(GetThis() != this); } for (size_t i = 0; i < m_threadCount; i++) { tickle(); // 唤醒空闲线程 } if (m_schedulerFiber) { tickle(); // 唤醒调度器Fiber } if (m_schedulerFiber) { m_schedulerFiber->resume(); // 开始调度任务 if (debug) { std::cout << "Scheduler::stop() ends in thread:" << Thread::GetThreadId() << std::endl; } } // 获取此时的线程通过swap不会增加引用计数的方式加入到thrs,方便下面的join保持线程的正常退出 std::vector<std::shared_ptr<Thread>> thrs; { std::lock_guard<std::mutex> lock(m_mutex); thrs.swap(m_threads); } for (auto& thr : thrs) { thr->join(); } if (debug) { std::cout << "Scheduler::stop() ends in thread:" << Thread::GetThreadId() << std::endl; } }

总结

本篇文章到这里就结束了!!!希望可以帮助大家理解调度器~~~

http://www.jsqmd.com/news/1112828/

相关文章:

  • 【精通】SmartWriter v2.6:写作平台线上运营 — 监控告警、多租户隔离与成本治理深度实战
  • 低配手机如何畅玩高帧率游戏?一文看懂云手机背后的黑科技
  • 如何一键获取九大网盘真实下载链接?LinkSwift浏览器脚本终极指南
  • PostgreSQL 高频常用命令整理
  • ML模型服务化:从Notebook到生产环境的11个关键实践
  • rust 学习 match专项
  • 密码学Crypto-凯撒密码(经典移位密码)
  • CV极极极简发展史
  • Hive 内置函数
  • FastAPI:Python高性能API开发指南
  • Agent工程范式迁移:从确定性代码到非确定性大模型驱动的系统构建
  • 深度解析Whisky:macOS上Windows应用容器化的架构哲学
  • AI Coding 协作实践方案
  • 基础知识-互联网模型
  • 电气自动化专业:站在“绿牌”风口,解锁你的硬核职业版图
  • AI智能体能力评估新范式:从推箱子游戏看规划与推理的进化
  • 农贸市场快检室试剂采购:如何选择适配基层的快检耗材方案
  • JMeter消息队列压测全攻略:从方案设计到性能调优
  • MySQL数据库技术全解析:从SQL语法到实战应用的系统梳理
  • 如何从rand7生成rand5
  • 如何快速找回遗忘的压缩包密码:ArchivePasswordTestTool 完整指南
  • 创业者适合读EMBA吗?2026客观选型测评分析
  • 【无标题】小学期课设
  • 智能激活革命:KMS_VL_ALL_AIO如何彻底改变Windows和Office激活体验
  • utshell:新一代Rust实现的Bash兼容shell完全指南
  • 游戏开发教学方案
  • 成都专业的暖通商家有哪些
  • 基于STM32单片机WIFI云平台物联网 空气质量 烟雾温湿度PM2.5 1(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_
  • Hive 的内置函数
  • 武汉徐东火锅实测|理性避坑+科学选型测评指南