Marl纤程调度原理深度解析:实现高效协作式多任务处理
Marl纤程调度原理深度解析:实现高效协作式多任务处理
【免费下载链接】marlA hybrid thread / fiber task scheduler written in C++ 11项目地址: https://gitcode.com/gh_mirrors/ma/marl
Marl是一个用C++ 11编写的混合线程/纤程任务调度器,它结合了纤程和线程的优势,允许高效执行可能阻塞的任务,同时保持固定数量的硬件线程。本文将深入解析Marl纤程调度的核心原理,帮助开发者理解其如何实现高效协作式多任务处理。
什么是纤程?
纤程是一种轻量级的协作式线程,可以在显式的 yield 点暂停和恢复。由于目前没有标准的跨平台纤程或协程库,Marl实现了marl::OSFiber类来支持各平台,这些实现大多用汇编编写,用于保存和恢复被调用者保存的寄存器,并维护纤程栈的分配。marl::OSFiber是内部实现细节,并未在公共API中公开。
marl::Scheduler::Fiber是与marl::Scheduler紧密耦合的公共纤程接口,它具有简单的std::condition_variable类似接口。每个marl::Scheduler::Fiber都永久关联到一个marl::Scheduler::Worker,并且保证只能在用于暂停的同一线程上恢复。
Marl任务调度基础
任务定义与调度
marl::Task是std::function<void()>的别名,即一个不接受参数也不返回值的函数。任务通过marl::schedule()进行调度,通常实现为lambda表达式:
marl::schedule([] { printf("Hello world!\n"); });虽然marl::Task签名不接受参数,但通常会捕获变量作为任务的输入和输出。Marl的所有同步原语(marl::ConditionVariable除外)都持有内部状态的共享指针,建议按值捕获这些原语,以避免任务生命周期超过调用marl::schedule()的栈时导致内存损坏。
工作线程(Workers)
调度器包含多个marl::Scheduler::Worker,每个工作线程包含:
work.tasks- 待启动的任务队列work.fibers- 已暂停但准备恢复的纤程队列work.waiting- 等待恢复或超时的暂停纤程队列work.num- 与work.tasks.size() + work.fibers.size()同步的计数器work.numBlockedFibers- 记录当前在suspend()调用中阻塞的纤程数量idleFibers- 准备重用的空闲纤程集合
当调用marl::schedule()调度任务时,会选择一个工作线程并将任务放入其work.tasks队列。工作线程的选择规则如下:
- 如果调度器没有专用工作线程(
marl::Scheduler::config().workerThreads.count == 0),则任务被排队到当前执行线程的单线程工作线程(Single-Threaded-Worker) - 否则选择一个多线程工作线程(Multi-Threaded-Worker)。如果有工作线程进入spin-for-work状态,则优先选择这些线程,否则以轮询方式选择多线程工作线程
纤程调度核心机制
工作线程运行循环(run())
run()是工作纤程的主要处理函数,由每个多线程工作线程的线程启动时调用,也会在所有其他纤程阻塞时从Worker::suspend()生成新的工作纤程时调用。
该函数调用runUntilShutdown(),进入以下循环:
- 调用
waitForWork()阻塞直到有新的工作要处理 - 调用
runUntilIdle()处理所有新任务和纤程。注意纤程可以在runUntilIdle()内部切换,因此run()的执行可能在单个线程的纤程之间跳转
此循环一直持续到工作线程完成所有工作并被告知关闭。一旦循环因工作线程被告知关闭而退出,mainFiber将恢复,处理其余的关闭逻辑。
运行直到空闲(runUntilIdle())
顾名思义,此函数执行工作直到没有更多工作或所有工作都被阻塞。其基本逻辑如下:
恢复所有未阻塞的任务(纤程)
runUntilIdle()首先完成所有准备恢复(不再阻塞)的纤程。通过从work.fibers队列中获取一个纤程,将当前纤程放入idleFibers队列(此纤程被视为空闲,因为它正在寻找工作),并将上下文切换到获取的纤程。执行未阻塞的纤程优先于启动新任务,这是因为新任务可能会产生更多纤程,每个纤程都会消耗一定的内存(通常用于栈)。
开始执行新任务
一旦所有可恢复的纤程都已完成或重新阻塞,就从
work.tasks队列中获取新任务并执行。任务完成后,控制权返回到runUntilIdle(),主循环从步骤1重新开始。一旦没有更多纤程或任务可执行,
runUntilIdle()返回。
等待工作(waitForWork())
当工作线程没有任务可启动且没有纤程可恢复时,调用waitForWork()阻塞直到工作线程有事情可做。
如果工作线程是多线程工作线程,waitForWork()首先进入spinForWork(),否则跳过此阶段。
然后waitForWork()等待以下任一情况发生后返回:
- 纤程准备恢复,被加入
work.fibers队列 - 任务被加入
work.tasks队列 work.waiting队列中的纤程超时- 工作线程被关闭
返回前,work.waiting队列中超时的纤程会自动移至work.fibers队列。
自旋等待工作(spinForWork())
spinForWork()有两个作用:
尝试从其他工作线程窃取工作,以保持工作负载均衡
任务长度的持续时间可能有很大差异,随着时间的推移,一些工作线程可能会有大量的工作队列,而其他工作线程则处于饥饿状态。
spinForWork()仅在工作线程饥饿时调用,并会尝试从随机选择的工作线程窃取任务。由于纤程只能在同一线程上执行,因此只能窃取任务,不能窃取纤程。尝试避免将线程让渡给操作系统
通常会有一个任务(提供者)向调度器调度许多小的子任务,这些子任务均匀地分配给工作线程(消费者)。这些消费者的数量通常超过提供者,很容易出现提供者难以提供足够的工作来使消费者完全占用的情况。
在这种情况下,工作线程可能会进入一个循环:给它们一个任务,完成它,然后等待一小段时间以获取更多工作。当等待另一个任务时允许工作线程让渡给操作系统(例如使用
std::condition_variable::wait())可能会导致性能损失。根据平台的不同,线程可能需要一毫秒或更长时间才能被操作系统恢复。这种长度的停顿可能导致整个任务依赖图的显著停顿。
spinForWork()包含一个运行短时间的循环,循环体执行以下操作:
- 使用
nops的紧凑循环使CPU保持忙碌,同时定期检查work.num以查看是否有新工作可用。如果发现新工作,spinForWork()立即返回。 - 如果没有安排新工作,则尝试从另一个随机工作线程窃取任务。如果窃取成功,
spinForWork()立即返回。 - 如果窃取失败,则调用
std::this_thread::yield()以防止Marl使操作系统饥饿。
暂停(suspend())
Marl允许任务阻塞,同时保持线程忙碌。
如果任务阻塞,则调用Scheduler::Worker::suspend()。suspend()首先调用Scheduler::Worker::waitForWork(),阻塞直到有可执行的任务或纤程。然后发生以下情况之一:
- 如果有任何未阻塞的纤程,则从
work.fibers队列中获取纤程并切换到该纤程。 - 如果有任何空闲纤程,则从
idleFibers集合中获取一个并切换到该纤程。恢复后,这个空闲纤程将继续执行任务的角色。 - 如果以上情况都不发生,则需要创建一个新的纤程来继续执行任务。创建此纤程以开始在
marl::Scheduler::Worker::run()中执行,并切换到该纤程。
在所有情况下,suspend()调用都会切换到另一个纤程。当被暂停的纤程恢复时,suspend()返回给调用者。
工作线程类型
工作线程分为单线程工作线程(Single-Threaded-Worker)和多线程工作线程(Multi-Threaded-Worker)两种类型。两种模式的大部分逻辑是相同的。
最显著的区别是多线程工作线程会生成一个专用的工作线程来调用marl::Scheduler::run(),而单线程工作线程只会在所有其他纤程阻塞时在新的纤程上调用marl::Scheduler::run()。
单线程工作线程
每个通过调用marl::Scheduler::bind()绑定的线程都会创建一个单线程工作线程(STW)。
如果调度器没有专用工作线程(marl::Scheduler::config().workerThreads.count == 0),则调度的任务会排队到当前执行线程的STW。
由于在此模式下没有工作线程,STW上排队的任务不会自动在后台执行。相反,只有在调用marl::Scheduler::Worker::suspend()时才会执行任务。suspend()的逻辑对于STW和MTW是通用的,当所有其他纤程阻塞时,会生成调用marl::Scheduler::Worker::run()的新纤程。
void SingleThreadedWorkerExample() { marl::Scheduler::Config cfg; cfg.setWorkerThreadCount(0); // STW mode. marl::Scheduler scheduler(cfg); scheduler.bind(); defer(scheduler.unbind()); // Calling marl::schedule() enqueues the task on the STW, but does not // execute it until the thread is blocked. marl::Event done; marl::schedule([=] { done.signal(); }); // This is a blocking call. // marl::Event::wait() (indirectly) calls marl::Scheduler::Worker::suspend(). // marl::Scheduler::Worker::suspend() creates and switches to a fiber which // calls marl::Scheduler::Worker::run() to run all enqueued tasks. Once the // main fiber becomes unblocked, marl::Scheduler::Worker::runUntilIdle() will // switch back to the main fiber to continue execution of the application. done.wait(); }多线程工作线程
当marl::Scheduler使用正数量的工作线程(marl::Scheduler::Config::workerThread::count > 0)构造时,会创建多线程工作线程。
每个MTW与一个新的std::thread配对,该线程通过调用marl::Scheduler::Worker::run()开始。
当工作线程被告知关闭且所有工作完成后,marl::Scheduler::Worker::run()退出主处理循环,并切换回主线程纤程,从而结束std::thread。
快速上手Marl
要开始使用Marl,首先需要克隆仓库:
git clone https://gitcode.com/gh_mirrors/ma/marl然后可以参考examples/目录中的示例程序,如hello_task.cpp,了解如何创建调度器并调度任务。基本步骤如下:
- 创建使用所有可用逻辑处理器的Marl调度器
- 将调度器绑定到主线程
- 使用
marl::schedule()调度任务 - 使用同步原语(如
marl::WaitGroup)等待任务完成 - 确保在返回前解绑调度器
以下是一个简单的示例:
#include "marl/scheduler.h" #include "marl/waitgroup.h" int main() { // 创建使用所有逻辑处理器的Marl调度器 // 将此调度器绑定到主线程,以便我们可以调用marl::schedule() marl::Scheduler scheduler(marl::Scheduler::Config::allCores()); scheduler.bind(); defer(scheduler.unbind()); // 在返回前自动解绑 marl::WaitGroup saidHello(4); marl::Event sayHello; // 调度一些异步运行的任务 for (int i = 0; i < 4; i++) { // 每个任务将在4个工作线程之一上运行 marl::schedule([=] { // 等待信号再开始工作 sayHello.wait(); // 任务完成时递减WaitGroup计数器 defer(saidHello.done()); // 在任务中阻塞? // 调度器会为这个线程找到其他事情做。 marl::Thread::sleep(marl::milliseconds(10)); printf("Hello from task %d!\n", i); }); } sayHello.signal(); // 解除所有任务的阻塞 saidHello.wait(); // 等待所有任务完成 printf("All tasks said hello.\n"); // 所有任务都保证在调度器析构前完成。 return 0; }注意事项
绑定调度器到外部创建的线程
为了调用marl::schedule(),调度器必须绑定到调用线程。在调用marl::schedule()之前未将调度器绑定到线程将导致未定义行为。
marl::Scheduler可以同时绑定到任意数量的线程,并且可以通过marl::Scheduler::get()从绑定的线程中检索调度器。
从一个线程传递调度器到另一个线程的典型方式是:
// 在一个线程中: marl::Scheduler scheduler; some_thread.start([scheduler_ptr = &scheduler] { // 将调度器绑定到新线程 scheduler_ptr->bind(); defer(scheduler_ptr->unbind()); // ... });始终记住在终止线程之前解绑调度器。忘记解绑将导致marl::Scheduler析构函数无限期阻塞。
不要在Marl任务中使用外部阻塞调用
marl::Scheduler内部持有多个工作线程,这些线程将执行调度的任务。如果Marl任务在Marl同步原语上阻塞,Marl可以从阻塞的任务中让出,并继续执行其他调度的任务。
在Marl工作线程上调用非Marl阻塞函数将阻止该工作线程能够切换执行其他任务,直到阻塞函数返回。这些非Marl阻塞函数的示例包括:std::mutex::lock()、std::condition_variable::wait()、accept()。
短时间的阻塞调用是可以接受的,例如获取互斥锁以访问数据结构。但是要注意,不要在持有std::mutex锁的情况下使用Marl阻塞调用——Marl任务可能会在持有锁的情况下让出,并阻止其他任务重新获取互斥锁。这种情况可能会导致死锁。
如果需要从Marl工作线程进行阻塞调用,可能希望使用marl::blocking_call(),它将生成一个新线程来执行调用,允许Marl工作线程继续处理其他调度的任务。
总结
Marl通过结合纤程和线程的优势,提供了一个高效的任务调度解决方案。其核心在于工作线程对任务和纤程的管理,包括运行循环、任务窃取、纤程暂停和恢复等机制。通过理解这些内部工作原理,开发者可以更好地利用Marl来构建高性能的并发应用程序。无论是单线程模式还是多线程模式,Marl都能有效地管理任务执行,确保线程利用率最大化,同时避免不必要的阻塞和上下文切换开销。
要深入了解更多细节,可以参考项目中的官方文档:docs/scheduler.md,以及源代码中的关键实现文件,如scheduler.cpp和fiber相关实现。
【免费下载链接】marlA hybrid thread / fiber task scheduler written in C++ 11项目地址: https://gitcode.com/gh_mirrors/ma/marl
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考
