当前位置: 首页 > news >正文

从零开始手写一个协程库(二)

引言

这是关于协程库的第二篇文章,如果大家刚开始看这篇文章,建议先看一下我之前的这一篇。

从零开始手写一个协程库(一)-CSDN博客

本篇文章是对协程(fiber)类的一个代码实现,其中包含了对协程的重新恢复,创建,删除,暂停,重利用等操作

架构简介

对于一个协程它的状态分为三种,一种是READY,一种是TERM,一种是RUNNIG。到底是哪个协程该工作,什么时候工作,工作完之后状态怎么样,怎么处理,这都需要一个调度器来决定,这个调度器的任务就是来分配工作,回收资源,可能听起来有点像内存池做的事情。

我们这个协程库里,做了两手准备,第一个是调度器就是单独的一个协程,每一次当一个协程执行完之后都要把执行权还给调度器;第二个是主协程来调度。

然后就是代码的设计,我们对于主协程和子协程有不同的处理方式,所以自然创建方式也不同。然后我们的主协程是由GetThis创建的,因为如果一开始没有任何协程在运行,当我们想获取一个协程实例的时候自然就是创建一个协程,而这个协程就是我们的主协程

#ifndef FIBER_H #define FIBER_H #include <memory> #include <functional> #include <cstdint> #include <ucontext.h> #include <mutex> #include <atomic> #include <iostream> #include <cassert> #include <vector> namespace fengyue { class Fiber : public std::enable_shared_from_this<Fiber> { public: enum State{ READY, // 就绪状态 RUNNING, // 运行中状态 TERM // 已完成状态 }; private: Fiber(); // 细节,Fiber是私有的,只能被GetThis()方法调用,用于创建主协程 public: // 用于创建指定的回调函数,栈大小和run_in_scheduler 本协程是否参与调度器的调度,默认为true Fiber(bool for_pool); Fiber(std::function<void()> func, size_t stack_size = 0, bool run_in_scheduler = true); ~Fiber(); public: void reset(std::function<void()> func); // 重置协程状态和入口,重复利用栈空间,不用重新创建栈 void resume(); // 恢复协程执行 void yield(); // 暂停,将执行权让给调度 uint64_t getId() const {return m_id;} State getState() const {return m_state;} public: static void SetThis(Fiber* fiber); // 设置当前运行的协程 static std::shared_ptr<Fiber> GetThis(); // 获取当前运行的协程的实例指针 static void SetSchedulerFiber(Fiber* fiber); // 设置调度器协程,默认主协程 static uint64_t GetFiberId(); // 获取当前运行的协程的ID static void MainFunc(); // 协程的主函数,入口点 private: uint64_t m_id; // 协程的唯一ID State m_state = READY; // 协程的状态 uint32_t m_stack_size = 0; // 协程的栈大小,默认0表示使用默认栈大小 ucontext_t m_ctx; // 协程的上下文 void* m_stack = nullptr; // 协程的栈指针 std::function<void()> m_func; // 协程的回调函数 bool m_runInScheduler = true; // 是否将执行器交给调度器,默认为true bool m_for_pool = false; public: std::mutex m_mutex; // 协程的互斥锁 }; } #endif

代码

首先我们要声明几个全局变量,一个是正在运行的协程指针,每一次拿到这个指针就相当于拿到了这个协程里面的某一个运行的协程,第二个是主协程的指针,这个在创建的时候就已经确定了,第三个就是调度协程,我们这里默认就是主协程

SetThis()函数就是设置当前正在运行的协程

GetThis()这个函数要干两件事情,首先第一个if()判断,如果已经有协程正在运行了,那不管是主协程还是子协程,说明我们的主协程已经完全创建好了,那么我们直接返回一个智能指针。不过大家可能都注意到了,我们返回的全部都是t_fiber->shared_from_this(),这个我们上篇文章已经提过了,作用就是让引用计数+1,底层原理就是weak_ptr.lock(),目的就是防止这个指针因为被调度器拿到离开本作用域而计数-1变成0,直接析构销毁,再要运行的时候直接报错。而我们+1就很好的避免了这个问题,只有这个协程完全运行结束,我们来手动删除(这个后面会有)。

然后就是创建主线程,我们创建主线程用的是Fiber(),在这里面设置了t_fiber = main_fiber.get()

// 正在运行的协程 static thread_local Fiber* t_fiber = nullptr; // 主协程 static thread_local std::shared_ptr<Fiber> t_thread_fiber = nullptr; // 调度器协程 static thread_local Fiber* t_scheduler_fiber = nullptr; // 协程的ID计数器 static std::atomic<uint64_t> s_fiber_id{0}; // 活跃协程数量计数器 static std::atomic<uint64_t> s_fiber_count{0}; void Fiber::SetThis(Fiber* fiber) { t_fiber = fiber; } // 运行该函数,创建主协程 std::shared_ptr<Fiber> Fiber::GetThis() { if (t_fiber) { return t_fiber->shared_from_this(); // 主要是为了让引用计数+1 } std::shared_ptr<Fiber> main_fiber(new Fiber()); // 这里面设置了t_fiber = main_fiber.get() t_thread_fiber = main_fiber; t_scheduler_fiber = main_fiber.get(); // 除非主动设置,否则主协程默认为调度协程 assert(t_fiber == main_fiber.get()); return t_fiber->shared_from_this(); }

接下来就是构造主协程和子协程的函数,如果是构建主协程,说明这个是第一个协程,我们要设置状态为RUNNING,然后设置为当前运行的协程,并获取当前的上下文,然后把这个协程的信息设置一下。

创建子协程,状态时READY,因为创建不代表可以运行,因为子协程是要真正工作的,所以我们先分配栈的大小,然后我们先获取上下文,然后对这个上下文进行修改,最后用makecontext对这个上下文完成替换工作,绑定要执行的函数,等待之后的激活

// 作用:创建主协程,设置状态,初始化上下文,并分配ID Fiber::Fiber() { SetThis(this); // 在GetThis中使用了无参的构造函数,这里相当于把this(main_fiber)的指针地址赋值给了t_fiber m_state = RUNNING; if (getcontext(&m_ctx)) { std::cerr << "Fiber() failed\n"; pthread_exit(NULL); } m_id = s_fiber_id++; s_fiber_count++; if (debug) { std::cout << "Fiber():main id = " << m_id << std::endl; } } /* 作用:创建一个新的协程,初始化回调函数,栈的大小和状态,分配栈空间,并通过make修改上下文当set或swap激活ucontext_t的时候 m_ctx上下文就会执行make的第二个参数的函数 */ Fiber::Fiber(std::function<void()> func, size_t stack_size, bool run_in_scheduler) : m_func(func), m_runInScheduler(run_in_scheduler) { m_state = READY; // 分配栈空间 m_stack_size = stack_size ? stack_size : 128000; m_stack = malloc(m_stack_size); if (getcontext(&m_ctx)) { std::cerr << "Fiber(std::function<void()> func, size_t stack_size, bool run_in_scheduler) failed\n"; pthread_exit(NULL); } m_ctx.uc_link = nullptr; // 因为这里没有设置后继所以在运行完mainfunc后会返回到主协程 m_ctx.uc_stack.ss_sp = m_stack; // 地址 m_ctx.uc_stack.ss_size = m_stack_size; // 大小 makecontext(&m_ctx, &Fiber::MainFunc, 0); m_id = s_fiber_id++; s_fiber_count++; if (debug) { std::cout << "Fiber(std::function<void()> func, size_t stack_size, bool run_in_scheduler):id = " << m_id << std::endl; } }

对于一个协程的析构函数,s_fiber_count是一个全局变量,记录着所有的协程数量,所以在析构一个协程的时候要--,然后释放其对应的栈。

我们之所以要有reset()函数,主要是提高了效率,因为一个协程如果事情干完了,完全可以再重新分配任务,这样避免反复的开辟空间和销毁内存,我们直接用原来的栈,只需要传递一下新的函数即可,所以我们的操作就是给原来协程的m_func赋值新的函数,然后利用getcontext和makecontext修改上下文,绑定要执行的函数,等待之后的激活

Fiber::~Fiber() { s_fiber_count--; if (m_stack) { free(m_stack); } if (debug) { std::cout << "Fiber::~Fiber():id = " << m_id << std::endl; } } // 作用:重置协程的回调函数,并重新设置上下文,将协程从"TERM"状态转换为"READY"状态 void Fiber::reset(std::function<void()> func) { assert(m_state == TERM && m_stack); m_state = READY; m_func = func; if (getcontext(&m_ctx)) { std::cerr << "Fiber::reset(std::function<void()> func) failed\n"; pthread_exit(NULL); } m_ctx.uc_link = nullptr; m_ctx.uc_stack.ss_sp = m_stack; m_ctx.uc_stack.ss_size = m_stack_size; makecontext(&m_ctx, &Fiber::MainFunc, 0); }

接下来是两个关键的函数:

一个是resume():恢复运行状态,因为我们这个项目做了两手准备,一个是主协程来调度,一个是调度器单独是一个协程,所以我们特地用m_runInScheduler变量来记录我们的选择,如果为true,那么说明我们选择的是后者,不过无论是哪一种,因为协程的任务已经确定了,只需要我们激活,因为此时执行权要么在调度器那,要么在主协程那,所以我们用swapcontext()来切换上下文,执行我们当前的上下文

另一个是yield():暂停状态,首先这个函数是暂停不是销毁,所以我们的状态是READY,也就是可以再次resume(),然后逻辑和resume()的恰好相反,我们要把执行权返回给主协程或者调度器,用swapcontext(),同时要设施当前执行的协程是调度器或者主协程

// 作用:将协程的状态设置为running,并恢复协程的执行,如果m_runInScheduler为true,则将上下文切换到调度协程,否则切换到主线程的协程 void Fiber::resume() { m_state = RUNNING; if (m_runInScheduler) { SetThis(this); // 设置当前工作的协程为this if (swapcontext(&(t_scheduler_fiber->m_ctx), &m_ctx)) { std::cerr << "Fiber::resume() failed\n"; pthread_exit(NULL); } } else { SetThis(this); if (swapcontext(&(t_thread_fiber->m_ctx), &m_ctx)) { std::cerr << "Fiber::resume() failed\n"; pthread_exit(NULL); } } } void Fiber::yield() { assert(m_state == RUNNING || m_state == TERM); if (m_state != TERM) { m_state = READY; } if (m_runInScheduler) { SetThis(t_scheduler_fiber); if (swapcontext(&m_ctx, &(t_scheduler_fiber->m_ctx))) { std::cerr << "Fiber::yield() failed\n"; pthread_exit(NULL); } } else { SetThis (t_thread_fiber.get()); if (swapcontext(&m_ctx, &(t_thread_fiber->m_ctx))) { std::cerr << "Fiber::yield() failed\n"; pthread_exit(NULL); } } }

最后一个是我们执行的主函数,在代码里面我已经说明了一些细节,大家看一看。

主要就是对于计数+1的理解,我们是手动释放,也就是reset(),然后是把一个裸指针交给调度器完成最后的销毁任务

void Fiber::MainFunc() { std::shared_ptr<Fiber> cur = GetThis(); // GetThis()的shared_from_this的方法让引用计数加1,触发里面的if()判断 assert(cur != nullptr); cur->m_func(); // 执行函数 cur->m_func = nullptr; cur->m_state = TERM; // 函数执行完毕一定要设定为term状态,如果是ready说明之后会resume,但是term就会被调度器删除 // 运行完毕,让出执行权 /* 这里不可以是cur->yield(),因为如果我们先调用yield(),首先我们会进行swap,把cur的上下文全部写入ucp中,然后切换到主线程 主线程调度器删除cur,cur计数-1,这个时候cur的计数是1,因为cur的引用是在堆上,所以即使栈被删除了,cur这个指针也不会被删除 如果我们先reset(),因为raw_ptr是一个裸指针(get()得来的),在栈上,那么删除的时候就不会调用任何的析构函数,并且把主动权让给了调度器 当栈被删除的时候,这个裸指针也会被删除 */ auto raw_ptr = cur.get(); cur.reset(); raw_ptr->yield(); }

所以基于上述的代码我们也可以很轻松的写出一个简单的协程池,而这个协程池的关键就是利用了reset()这个函数,先创建协程,然后不断地利用这些已经创建好的协程

// 协程池 class FiberPool { public: FiberPool(size_t size) { for (size_t i = 0; i < size; i++) { fibers.push_back(std::make_shared<Fiber>(true)); } } std::shared_ptr<Fiber> getFiber(std::function<void()> func) { for (auto& fiber : fibers) { if (fiber->getState() == Fiber::READY) { fiber->reset(func); return fiber; } } auto new_fiber = std::make_shared<Fiber>(func); fibers.push_back(new_fiber); return new_fiber; } private: std::vector<std::shared_ptr<Fiber>> fibers; };

总结

本篇文章就到这里结束了!!!希望可以对大家有所帮助~~~

http://www.jsqmd.com/news/1108097/

相关文章:

  • 抖音评论采集终极指南:三步快速获取完整评论数据
  • 【VMware渗透实验室搭建指南】:20年安全专家亲授Kali Linux零基础部署+网络配置避坑清单
  • 别再重装了!Kali Linux VMware虚拟机性能优化7项黄金法则(含vSphere ESXi迁移适配指南)
  • 终极GitHub加速方案:Fast-GitHub让你的下载速度提升10倍以上
  • VLC点击暂停插件:如何通过鼠标点击控制视频播放
  • AI 工程师 / 架构师面试题集
  • YimMenu终极指南:免费GTA5增强工具完全使用教程
  • Amlogic设备Armbian系统实战部署与性能优化完整指南
  • 3分钟解决Windows苹果USB驱动问题:iPhone网络共享一键安装指南
  • 一键激活Windows和Office:KMS_VL_ALL_AIO智能解决方案详解
  • PCF80如何用于口腔组织中的成纤维细胞状态与免疫邻域分析?
  • 2026嘉兴黄金回收白银回收铂金回收旧料回收怎么选?五家高实价铂金白银线下门店测评清单 + 联系方式
  • 3分钟搞定抖音评论采集:零代码工具让你轻松获取完整数据
  • RustScan:3秒扫完6万多个端口的扫描器
  • VMware Workstation 17.x黑屏暴雷事件深度复盘:UEFI固件兼容性漏洞与补丁级修复方案(附官方KB编号)
  • 重构暗黑3操作体验:D3KeyHelper的自动化革命
  • 终极指南:使用KMS_VL_ALL_AIO智能脚本永久激活Windows与Office
  • 一文说清 C++指针与C#引用类型
  • 从报错0x0000007B到桌面秒进:VMware安装macOS的5个隐藏参数与BIOS级调优技巧(实测提升启动速度300%)
  • comfyui 文生图
  • 【VMware Tools安装故障终极指南】:20年虚拟化专家亲授5大核心原因与秒级修复方案
  • 从空间转录组到PCF80:母胎界面研究如何补充蛋白功能证据?
  • 简单三步:用鼠标点击控制VLC播放暂停的完整指南
  • 从WordPress插件命令注入到Kubernetes容器逃逸的完整渗透测试实战
  • Kali Linux在VMware中无法联网、显卡失灵、复制粘贴失效?(2024最新兼容性修复手册)
  • C语言 指针的理解 — 3
  • 虚假信息治理新范式:跨层协同人机耦合防御体系
  • Grafana 生产环境运维与排错:日志、权限与升级实战
  • Grafana 告警历史与复盘:使用 Loki 和 Tempo 追踪告警链路
  • 什么是选择自己适合的赛道?