当前位置: 首页 > news >正文

Marl纤程调度原理深度解析:实现高效协作式多任务处理

Marl纤程调度原理深度解析:实现高效协作式多任务处理

【免费下载链接】marlA hybrid thread / fiber task scheduler written in C++ 11项目地址: https://gitcode.com/gh_mirrors/ma/marl

Marl是一个用C++ 11编写的混合线程/纤程任务调度器,它结合了纤程和线程的优势,允许高效执行可能阻塞的任务,同时保持固定数量的硬件线程。本文将深入解析Marl纤程调度的核心原理,帮助开发者理解其如何实现高效协作式多任务处理。

什么是纤程?

纤程是一种轻量级的协作式线程,可以在显式的 yield 点暂停和恢复。由于目前没有标准的跨平台纤程或协程库,Marl实现了marl::OSFiber类来支持各平台,这些实现大多用汇编编写,用于保存和恢复被调用者保存的寄存器,并维护纤程栈的分配。marl::OSFiber是内部实现细节,并未在公共API中公开。

marl::Scheduler::Fiber是与marl::Scheduler紧密耦合的公共纤程接口,它具有简单的std::condition_variable类似接口。每个marl::Scheduler::Fiber都永久关联到一个marl::Scheduler::Worker,并且保证只能在用于暂停的同一线程上恢复。

Marl任务调度基础

任务定义与调度

marl::Taskstd::function<void()>的别名,即一个不接受参数也不返回值的函数。任务通过marl::schedule()进行调度,通常实现为lambda表达式:

marl::schedule([] { printf("Hello world!\n"); });

虽然marl::Task签名不接受参数,但通常会捕获变量作为任务的输入和输出。Marl的所有同步原语(marl::ConditionVariable除外)都持有内部状态的共享指针,建议按值捕获这些原语,以避免任务生命周期超过调用marl::schedule()的栈时导致内存损坏。

工作线程(Workers)

调度器包含多个marl::Scheduler::Worker,每个工作线程包含:

  • work.tasks- 待启动的任务队列
  • work.fibers- 已暂停但准备恢复的纤程队列
  • work.waiting- 等待恢复或超时的暂停纤程队列
  • work.num- 与work.tasks.size() + work.fibers.size()同步的计数器
  • work.numBlockedFibers- 记录当前在suspend()调用中阻塞的纤程数量
  • idleFibers- 准备重用的空闲纤程集合

当调用marl::schedule()调度任务时,会选择一个工作线程并将任务放入其work.tasks队列。工作线程的选择规则如下:

  • 如果调度器没有专用工作线程(marl::Scheduler::config().workerThreads.count == 0),则任务被排队到当前执行线程的单线程工作线程(Single-Threaded-Worker)
  • 否则选择一个多线程工作线程(Multi-Threaded-Worker)。如果有工作线程进入spin-for-work状态,则优先选择这些线程,否则以轮询方式选择多线程工作线程

纤程调度核心机制

工作线程运行循环(run())

run()是工作纤程的主要处理函数,由每个多线程工作线程的线程启动时调用,也会在所有其他纤程阻塞时从Worker::suspend()生成新的工作纤程时调用。

该函数调用runUntilShutdown(),进入以下循环:

  1. 调用waitForWork()阻塞直到有新的工作要处理
  2. 调用runUntilIdle()处理所有新任务和纤程。注意纤程可以在runUntilIdle()内部切换,因此run()的执行可能在单个线程的纤程之间跳转

此循环一直持续到工作线程完成所有工作并被告知关闭。一旦循环因工作线程被告知关闭而退出,mainFiber将恢复,处理其余的关闭逻辑。

运行直到空闲(runUntilIdle())

顾名思义,此函数执行工作直到没有更多工作或所有工作都被阻塞。其基本逻辑如下:

  1. 恢复所有未阻塞的任务(纤程)

    runUntilIdle()首先完成所有准备恢复(不再阻塞)的纤程。通过从work.fibers队列中获取一个纤程,将当前纤程放入idleFibers队列(此纤程被视为空闲,因为它正在寻找工作),并将上下文切换到获取的纤程。

    执行未阻塞的纤程优先于启动新任务,这是因为新任务可能会产生更多纤程,每个纤程都会消耗一定的内存(通常用于栈)。

  2. 开始执行新任务

    一旦所有可恢复的纤程都已完成或重新阻塞,就从work.tasks队列中获取新任务并执行。任务完成后,控制权返回到runUntilIdle(),主循环从步骤1重新开始。

  3. 一旦没有更多纤程或任务可执行,runUntilIdle()返回。

等待工作(waitForWork())

当工作线程没有任务可启动且没有纤程可恢复时,调用waitForWork()阻塞直到工作线程有事情可做。

如果工作线程是多线程工作线程,waitForWork()首先进入spinForWork(),否则跳过此阶段。

然后waitForWork()等待以下任一情况发生后返回:

  • 纤程准备恢复,被加入work.fibers队列
  • 任务被加入work.tasks队列
  • work.waiting队列中的纤程超时
  • 工作线程被关闭

返回前,work.waiting队列中超时的纤程会自动移至work.fibers队列。

自旋等待工作(spinForWork())

spinForWork()有两个作用:

  1. 尝试从其他工作线程窃取工作,以保持工作负载均衡

    任务长度的持续时间可能有很大差异,随着时间的推移,一些工作线程可能会有大量的工作队列,而其他工作线程则处于饥饿状态。spinForWork()仅在工作线程饥饿时调用,并会尝试从随机选择的工作线程窃取任务。由于纤程只能在同一线程上执行,因此只能窃取任务,不能窃取纤程。

  2. 尝试避免将线程让渡给操作系统

    通常会有一个任务(提供者)向调度器调度许多小的子任务,这些子任务均匀地分配给工作线程(消费者)。这些消费者的数量通常超过提供者,很容易出现提供者难以提供足够的工作来使消费者完全占用的情况。

    在这种情况下,工作线程可能会进入一个循环:给它们一个任务,完成它,然后等待一小段时间以获取更多工作。当等待另一个任务时允许工作线程让渡给操作系统(例如使用std::condition_variable::wait())可能会导致性能损失。根据平台的不同,线程可能需要一毫秒或更长时间才能被操作系统恢复。这种长度的停顿可能导致整个任务依赖图的显著停顿。

spinForWork()包含一个运行短时间的循环,循环体执行以下操作:

  • 使用nops的紧凑循环使CPU保持忙碌,同时定期检查work.num以查看是否有新工作可用。如果发现新工作,spinForWork()立即返回。
  • 如果没有安排新工作,则尝试从另一个随机工作线程窃取任务。如果窃取成功,spinForWork()立即返回。
  • 如果窃取失败,则调用std::this_thread::yield()以防止Marl使操作系统饥饿。

暂停(suspend())

Marl允许任务阻塞,同时保持线程忙碌。

如果任务阻塞,则调用Scheduler::Worker::suspend()suspend()首先调用Scheduler::Worker::waitForWork(),阻塞直到有可执行的任务或纤程。然后发生以下情况之一:

  1. 如果有任何未阻塞的纤程,则从work.fibers队列中获取纤程并切换到该纤程。
  2. 如果有任何空闲纤程,则从idleFibers集合中获取一个并切换到该纤程。恢复后,这个空闲纤程将继续执行任务的角色。
  3. 如果以上情况都不发生,则需要创建一个新的纤程来继续执行任务。创建此纤程以开始在marl::Scheduler::Worker::run()中执行,并切换到该纤程。

在所有情况下,suspend()调用都会切换到另一个纤程。当被暂停的纤程恢复时,suspend()返回给调用者。

工作线程类型

工作线程分为单线程工作线程(Single-Threaded-Worker)和多线程工作线程(Multi-Threaded-Worker)两种类型。两种模式的大部分逻辑是相同的。

最显著的区别是多线程工作线程会生成一个专用的工作线程来调用marl::Scheduler::run(),而单线程工作线程只会在所有其他纤程阻塞时在新的纤程上调用marl::Scheduler::run()

单线程工作线程

每个通过调用marl::Scheduler::bind()绑定的线程都会创建一个单线程工作线程(STW)。

如果调度器没有专用工作线程(marl::Scheduler::config().workerThreads.count == 0),则调度的任务会排队到当前执行线程的STW。

由于在此模式下没有工作线程,STW上排队的任务不会自动在后台执行。相反,只有在调用marl::Scheduler::Worker::suspend()时才会执行任务。suspend()的逻辑对于STW和MTW是通用的,当所有其他纤程阻塞时,会生成调用marl::Scheduler::Worker::run()的新纤程。

void SingleThreadedWorkerExample() { marl::Scheduler::Config cfg; cfg.setWorkerThreadCount(0); // STW mode. marl::Scheduler scheduler(cfg); scheduler.bind(); defer(scheduler.unbind()); // Calling marl::schedule() enqueues the task on the STW, but does not // execute it until the thread is blocked. marl::Event done; marl::schedule([=] { done.signal(); }); // This is a blocking call. // marl::Event::wait() (indirectly) calls marl::Scheduler::Worker::suspend(). // marl::Scheduler::Worker::suspend() creates and switches to a fiber which // calls marl::Scheduler::Worker::run() to run all enqueued tasks. Once the // main fiber becomes unblocked, marl::Scheduler::Worker::runUntilIdle() will // switch back to the main fiber to continue execution of the application. done.wait(); }

多线程工作线程

marl::Scheduler使用正数量的工作线程(marl::Scheduler::Config::workerThread::count > 0)构造时,会创建多线程工作线程。

每个MTW与一个新的std::thread配对,该线程通过调用marl::Scheduler::Worker::run()开始。

当工作线程被告知关闭且所有工作完成后,marl::Scheduler::Worker::run()退出主处理循环,并切换回主线程纤程,从而结束std::thread

快速上手Marl

要开始使用Marl,首先需要克隆仓库:

git clone https://gitcode.com/gh_mirrors/ma/marl

然后可以参考examples/目录中的示例程序,如hello_task.cpp,了解如何创建调度器并调度任务。基本步骤如下:

  1. 创建使用所有可用逻辑处理器的Marl调度器
  2. 将调度器绑定到主线程
  3. 使用marl::schedule()调度任务
  4. 使用同步原语(如marl::WaitGroup)等待任务完成
  5. 确保在返回前解绑调度器

以下是一个简单的示例:

#include "marl/scheduler.h" #include "marl/waitgroup.h" int main() { // 创建使用所有逻辑处理器的Marl调度器 // 将此调度器绑定到主线程,以便我们可以调用marl::schedule() marl::Scheduler scheduler(marl::Scheduler::Config::allCores()); scheduler.bind(); defer(scheduler.unbind()); // 在返回前自动解绑 marl::WaitGroup saidHello(4); marl::Event sayHello; // 调度一些异步运行的任务 for (int i = 0; i < 4; i++) { // 每个任务将在4个工作线程之一上运行 marl::schedule([=] { // 等待信号再开始工作 sayHello.wait(); // 任务完成时递减WaitGroup计数器 defer(saidHello.done()); // 在任务中阻塞? // 调度器会为这个线程找到其他事情做。 marl::Thread::sleep(marl::milliseconds(10)); printf("Hello from task %d!\n", i); }); } sayHello.signal(); // 解除所有任务的阻塞 saidHello.wait(); // 等待所有任务完成 printf("All tasks said hello.\n"); // 所有任务都保证在调度器析构前完成。 return 0; }

注意事项

绑定调度器到外部创建的线程

为了调用marl::schedule(),调度器必须绑定到调用线程。在调用marl::schedule()之前未将调度器绑定到线程将导致未定义行为。

marl::Scheduler可以同时绑定到任意数量的线程,并且可以通过marl::Scheduler::get()从绑定的线程中检索调度器。

从一个线程传递调度器到另一个线程的典型方式是:

// 在一个线程中: marl::Scheduler scheduler; some_thread.start([scheduler_ptr = &scheduler] { // 将调度器绑定到新线程 scheduler_ptr->bind(); defer(scheduler_ptr->unbind()); // ... });

始终记住在终止线程之前解绑调度器。忘记解绑将导致marl::Scheduler析构函数无限期阻塞。

不要在Marl任务中使用外部阻塞调用

marl::Scheduler内部持有多个工作线程,这些线程将执行调度的任务。如果Marl任务在Marl同步原语上阻塞,Marl可以从阻塞的任务中让出,并继续执行其他调度的任务。

在Marl工作线程上调用非Marl阻塞函数将阻止该工作线程能够切换执行其他任务,直到阻塞函数返回。这些非Marl阻塞函数的示例包括:std::mutex::lock()std::condition_variable::wait()accept()

短时间的阻塞调用是可以接受的,例如获取互斥锁以访问数据结构。但是要注意,不要在持有std::mutex锁的情况下使用Marl阻塞调用——Marl任务可能会在持有锁的情况下让出,并阻止其他任务重新获取互斥锁。这种情况可能会导致死锁。

如果需要从Marl工作线程进行阻塞调用,可能希望使用marl::blocking_call(),它将生成一个新线程来执行调用,允许Marl工作线程继续处理其他调度的任务。

总结

Marl通过结合纤程和线程的优势,提供了一个高效的任务调度解决方案。其核心在于工作线程对任务和纤程的管理,包括运行循环、任务窃取、纤程暂停和恢复等机制。通过理解这些内部工作原理,开发者可以更好地利用Marl来构建高性能的并发应用程序。无论是单线程模式还是多线程模式,Marl都能有效地管理任务执行,确保线程利用率最大化,同时避免不必要的阻塞和上下文切换开销。

要深入了解更多细节,可以参考项目中的官方文档:docs/scheduler.md,以及源代码中的关键实现文件,如scheduler.cpp和fiber相关实现。

【免费下载链接】marlA hybrid thread / fiber task scheduler written in C++ 11项目地址: https://gitcode.com/gh_mirrors/ma/marl

创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

http://www.jsqmd.com/news/766637/

相关文章:

  • 提升开发效率:用快马AI替代git搜索与整合,一键生成定制化管理后台
  • 常见精度及使用场景
  • VSCode医疗合规校验工具突然封测升级!2026.3.1起强制启用“患者数据血缘追踪”功能——你的遗留系统还能撑过下个季度吗?
  • Cat-Catch终极实战指南:5步快速精通网页资源嗅探
  • Firefox隐藏技巧:利用chrome文件夹和CSS,彻底改造你的新标签页与隐私浏览页
  • 为内部知识库问答系统接入 Taotoken 作为多模型推理后端
  • Python监控Claude API用量:进度条可视化与自动化成本管理
  • Android Studio项目导入就报错?手把手教你排查‘Please select Android SDK’的三种常见原因
  • League Akari:基于模块化架构的英雄联盟客户端工具箱技术解析
  • Awesome Diffusion Models in Medical Imaging:医学影像扩散模型完全入门指南
  • 从医学影像到游戏开发:用Python+VTK 9.3.0快速上手三维可视化(附完整代码)
  • AI规则引擎:动态管理提示词与工作流编排的工程实践
  • 2026年容器板切割厂家推荐榜/钢板零割,低合金板切割,高建板钢板切割,合金板钢板切割,优碳板钢板切割 - 品牌策略师
  • 不止于调参:用FreeMASTER Recorder在STM32上实现数据记录与触发上传
  • 为什么92%的工业IoT项目在Docker 27集群部署时失败?——附可直接投产的27套校验级部署代码
  • 中兴光猫终极管理指南:zteOnu一键开启工厂模式与永久Telnet的完整教程
  • 为 Hermes Agent 配置 Taotoken 自定义模型提供商
  • 如何在fastbook中实现自定义损失函数:从基础到实践的完整指南
  • 维普AIGC再次停服升级后查什么?毕业季降AI避坑指南与实操细节,建议收藏 - 殷念写论文
  • 基于MIRFS的无人机集群隐蔽网络时间同步联合战术信息分发系统【附代码】
  • 如何快速上手Bluge:10个实用索引技巧与最佳实践
  • 手把手调试MIPI DBI显示:用逻辑分析仪抓取Type A/B时序波形,快速定位花屏、闪屏问题
  • CookieCutter Web界面:图形化模板管理的终极解决方案
  • 为什么83%的银行容器平台在等保测评中栽在Docker 27?揭秘3类高频不合规配置及修复代码级方案
  • 公路表面裂缝目标检测数据集分享(适用于YOLO系列深度学习检测任务)
  • 告别IP被封!Python爬虫进阶:用itertools.cycle实现智能代理轮询,一天采集百万数据无压力
  • 如何快速上手S7.NET+:西门子PLC通信的终极.NET解决方案
  • 5个步骤扩展Cookiecutter项目模板功能:打造专属插件系统
  • AI-Media2Doc:本地部署的音视频智能处理与文档生成工具实践
  • 【RED-Net | NIPS 2016论文阅读】:对称跳跃连接的深度编解码图像复原网络