C++协程用法总结
C++协程用法总结
C++20 引入的协程(Coroutines)是一项重要的语言特性,它提供了一种编写异步、并发和惰性求值代码的全新范式。协程是一种可以暂停执行并在之后恢复的函数,这种能力使得开发者可以用同步的方式编写异步代码,极大地提升了代码的可读性和可维护性。
核心概念:无栈协程
C++的协程是无栈(Stackless)的-。这意味着协程的暂停与恢复不依赖操作系统内核栈,其状态(局部变量、暂停点等)都保存在一个动态分配的协程状态(coroutine state)对象中。因此,协程的切换开销极低-,它是由开发者(或库)在用户态进行调度的协作式(Cooperative)任务,而非操作系统调度的抢占式线程。
关键语法:三个新关键字
只要一个函数的定义中使用了以下任何一个关键字,它就是一个协程:
co_await:用于暂停当前协程的执行,直到等待的某个操作(如异步 I/O)完成。// 伪代码:异步读取数据,看起来像是同步调用 auto data = co_await async_read_some();co_yield:用于暂停执行并产生(yield)一个值给调用者,常用于实现生成器(Generator),可以产生一个值的序列。// 生成一个从0开始的无限整数序列 generator<int> iota(int start = 0) { while (true) { co_yield start++; } }co_return:用于结束协程的执行并返回一个最终值。// 返回一个整数的协程 task<int> compute_value() { int result = /* ... 一些计算 ... */; co_return result; }
协程的三大核心组件
协程的运作主要围绕三个相互关联的对象展开:
承诺对象(Promise Object):存在于协程内部,是协程与外界沟通的桥梁。它负责:
创建协程返回的外部对象(
get_return_object)。控制协程在开始时(
initial_suspend)和结束时(final_suspend)是否立即暂停。接收
co_return返回的值(return_value)或co_yield产生的值(yield_value)。处理协程内部抛出的未被捕获的异常(
unhandled_exception)。
协程句柄(Coroutine Handle):存在于协程外部,类型为
std::coroutine_handle<Promise>-,它是一个轻量级的、非拥有(non-owning)的句柄,用于从外部控制协程的生命周期。resume():恢复一个被暂停的协程,使其继续执行。destroy():销毁协程及其状态。done():检查协程是否已经执行完毕。可以通过
promise()成员访问其关联的承诺对象。
协程状态(Coroutine State):是一个内部、动态分配的对象,包含了协程运行所需的所有信息,如承诺对象、参数的拷贝、局部变量和暂停点等。
标准库支持:<coroutine>头文件
C++20 在<coroutine>头文件中提供了协程的基础设施:
std::coroutine_traits:一个 traits 类,用于从协程的返回类型和参数类型推导出其承诺对象的类型-。std::coroutine_handle:如前所述,是用于操控协程的核心句柄类模板。std::suspend_never与std::suspend_always:两个简单的等待体(Awaitable)。suspend_never:co_await它时不会挂起。suspend_always:co_await它时总是挂起。
它们常用于在initial_suspend和final_suspend中控制协程的启动和结束行为。
无操作协程(No-op Coroutines):提供
std::noop_coroutine_handle等类型,用于创建一个执行没有任何效果(resume/destroy 都是空操作)的协程句柄。
典型应用场景
异步 I/O 与网络编程:这是协程最主要的应用场景-。可以消除“回调地狱”,让异步代码变得和同步代码一样直观。
生成器(Generator):用于惰性生成序列,例如生成无限数列、读取大型文件的行等。
简化并发任务:可以更容易地编写并发任务,如
async/await模式-。事件驱动编程:在事件循环中,协程可以优雅地等待事件的发生-。
一个完整的自定义协程示例
以下是一个最简化的生成器(Generator)示例,展示了如何组合上述概念:
#include <coroutine> #include <iostream> // 1. 定义一个简单的生成器类型 template<typename T> struct Generator { // 2. 定义承诺对象类型 struct promise_type { T current_value; // 创建Generator对象 Generator get_return_object() { return Generator{std::coroutine_handle<promise_type>::from_promise(*this)}; } // 初始不挂起 auto initial_suspend() { return std::suspend_never{}; } // 最终挂起 auto final_suspend() noexcept { return std::suspend_always{}; } // 处理co_yield auto yield_value(T value) { current_value = value; return std::suspend_always{}; } // 处理co_return (void) void return_void() {} // 处理异常 void unhandled_exception() { std::terminate(); } }; // 3. 使用协程句柄 std::coroutine_handle<promise_type> handle; Generator(std::coroutine_handle<promise_type> h) : handle(h) {} ~Generator() { if (handle) handle.destroy(); } // 迭代器支持 bool next() { if (!handle || handle.done()) return false; handle.resume(); return !handle.done(); } T value() const { return handle.promise().current_value; } }; // 4. 使用协程 Generator<int> getNumbers(int start, int count) { for (int i = start; i < start + count; ++i) { co_yield i; // 产生值 } } int main() { auto gen = getNumbers(10, 3); while (gen.next()) { std::cout << gen.value() << " "; // 输出: 10 11 12 } return 0; }重要限制
协程不能使用变长实参(variadic arguments)、普通的
return语句或占位符返回类型(auto或 Concept)。constexpr函数、consteval函数、构造函数、析构函数和main函数不能是协程。
展望:C++26 的 Sender/Receiver
虽然 C++20 提供了协程的核心语言机制,但标准库尚未提供开箱即用的async/await支持。C++26 计划引入Sender/Receiver模型,旨在为异步操作提供一个通用的、可组合的接口-,这将进一步标准化和完善 C++ 的异步编程生态。
一个c++使用协程实现的tcp服务端、客户端demo
文件清单
| 文件 | 作用 |
|---|---|
| coro_net.hpp | 自实现 epoll + 协程调度核心(header-only) |
| server.cpp | echo 服务端 |
| client.cpp | echo 客户端 |
| Makefile | 构建 |
架构分层
┌─────────────────────────────────────────────────┐ │ 应用层:server() / session() / run_client() │ ← 协程业务 ├─────────────────────────────────────────────────┤ │ Awaitable 层:AsyncAccept/Connect/Read/Write │ ← co_await 接口 ├─────────────────────────────────────────────────┤ │ DetachedTask:fire-and-forget 协程返回类型 │ ← 协程生命周期 ├─────────────────────────────────────────────────┤ │ EventLoop:epoll + fd→coroutine_handle 调度器 │ ← 事件循环 └─────────────────────────────────────────────────┘核心设计要点
1. EventLoop 是调度中枢 ( coro_net.hpp:33 )
- arm(fd, events, handle) :把 fd 注册到 epoll,记下"fd → 协程句柄"映射
- run() : epoll_wait 醒来后,先 EPOLL_CTL_DEL 把 fd 从 epoll 摘掉,再 handle.resume() 恢复协程
- 关键顺序: 先摘掉再恢复 ——协程恢复后可能立刻再次 co_await 同一个 fd,避免重复注册
2. DetachedTask 实现 "spawn-and-forget" ( coro_net.hpp:80 )
std::suspend_never initial_suspend() { return {}; } // 创建即运行 std::suspend_never final_suspend() { return {}; } // 结束自动销毁帧这样新连接来时 session(cfd) 直接调用即可,协程立刻跑到第一个 co_await 才挂起返回,无需手动管理句柄生命周期。
3. Awaitable 三件套衔接 epoll 与协程
struct AsyncRead { int fd; char* buf; size_t len; bool await_ready() { return false; } void await_suspend(handle h) { EventLoop::arm(fd, EPOLLIN, h); } // 挂起+注册 ssize_t await_resume() { return ::read(fd, buf, len); } // 恢复时执行 };- - await_ready 永远 false → 一定挂起
- - await_suspend 把当前协程句柄交给 EventLoop,由 epoll 在 fd 就绪时回调
- - await_resume 在恢复后被调用,执行真正的 read/write/accept
4. 异步 connect 用 EINPROGRESS + EPOLLOUT + SO_ERROR ( coro_net.hpp:142 )
非阻塞 connect() 通常返回 EINPROGRESS ,等 fd 可写后用 getsockopt(SO_ERROR) 取真实错误码。
5. EAGAIN 处理:caller 重试,不在 await_resume 里 loop epoll 已 DEL 再恢复,理论上不会 EAGAIN,但偶发竞态时 caller 用 continue 重新 co_await ,会重新挂起+注册,不会忙等。
构建运行
cd coro_tcp_demo make # 同时构建 server 和 client # 终端 1 ./server 12345 # 终端 2 ./client 127.0.0.1 12345 hello echo: hello ^D实测结果(已跑通)
[client] connected to 127.0.0.1:12345 (fd=16) type lines, Ctrl-D to quit echo: hello echo: world echo: 协程测试UTF-8 字节流也正确透传。
局限性说明(demo 范围内有意省略)
- 单线程 :所有协程跑在一个 EventLoop 上,不利用多核
- 单 fd 单等待者 : pending_ 是 unordered_map<fd, handle> ,不支持多协程同时 await 同一 fd
- 无 when_all / Task<T> :DetachedTask 是 fire-and-forget,不能 await 子协程结果;要支持需另写 Task<T> + co_await 复合机制
- 客户端 stdin 阻塞 : std::getline 仍同步;要全异步需把 STDIN_FILENO 也注册进 epoll
- 无定时器 :要支持超时需在 epoll 里加 timerfd 或维护一个最小堆
这些都是生产级协程库(如 asio、libco、cocpp)会补齐的部分,但作为"理解 epoll × 协程如何粘合"的最小骨架,本 demo 已完整覆盖核心机制。
代码
coro_tcp_demo/coro_net.hpp
// coro_net.hpp — self-contained epoll + C++20 coroutine scheduler // // Design: // - EventLoop: single-threaded epoll wrapper, schedules coroutine resumption // - DetachedTask: fire-and-forget coroutine, starts immediately, self-cleans // - Async{Accept,Connect,Read,Write}: awaitables bridging epoll <-> coroutines // // No third-party deps. Linux only (epoll). #pragma once #include <sys/epoll.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> #include <string.h> #include <signal.h> #include <coroutine> #include <cstdio> #include <cstdlib> #include <cstdint> #include <cstring> #include <string> #include <stdexcept> #include <unordered_map> #include <iostream> namespace coro { // ---------------- EventLoop: epoll + coroutine scheduler ---------------- // // One fd can be awaited by at most one coroutine at a time (no multi-wait). // When epoll reports an fd ready, we DEL it from epoll before resuming, so // the coroutine must re-arm (co_await again) if it wants to wait once more. class EventLoop { public: static EventLoop& instance() { static EventLoop inst; return inst; } EventLoop() : epfd_(::epoll_create1(EPOLL_CLOEXEC)) { if (epfd_ < 0) throw std::runtime_error("epoll_create1 failed"); } ~EventLoop() { if (epfd_ >= 0) ::close(epfd_); } EventLoop(const EventLoop&) = delete; EventLoop& operator=(const EventLoop&) = delete; // Register fd for `events`; when ready, resume `h`. void arm(int fd, uint32_t events, std::coroutine_handle<> h) { epoll_event ev{}; ev.events = events; ev.data.fd = fd; bool exists = pending_.count(fd) > 0; if (::epoll_ctl(epfd_, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0 && errno != EEXIST) { throw std::runtime_error(std::string("epoll_ctl: ") + ::strerror(errno)); } pending_[fd] = h; } void run() { running_ = true; epoll_event events[64]; while (running_) { int n = ::epoll_wait(epfd_, events, 64, -1); if (n < 0) { if (errno == EINTR) continue; throw std::runtime_error("epoll_wait failed"); } for (int i = 0; i < n; ++i) { int fd = events[i].data.fd; auto it = pending_.find(fd); if (it == pending_.end()) continue; auto h = it->second; // Disarm BEFORE resume: coroutine may re-arm inside await_resume path ::epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr); pending_.erase(it); h.resume(); // Caution: h's frame may be destroyed here if final_suspend // returned suspend_never. Do NOT touch h after resume(). } } } void stop() { running_ = false; } private: int epfd_; bool running_ = false; std::unordered_map<int, std::coroutine_handle<>> pending_; }; // ---------------- DetachedTask: fire-and-forget ---------------- // // initial_suspend = suspend_never -> coroutine runs as soon as it's created // final_suspend = suspend_never -> frame auto-destroys when coroutine ends // Result: a "spawn-and-forget" coroutine; great for connection handlers. struct DetachedTask { struct promise_type { DetachedTask get_return_object() noexcept { return {}; } std::suspend_never initial_suspend() noexcept { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() noexcept {} void unhandled_exception() { try { std::rethrow_exception(std::current_exception()); } catch (const std::exception& e) { std::cerr << "[coroutine] unhandled: " << e.what() << "\n"; } catch (...) { std::cerr << "[coroutine] unknown exception\n"; } } }; }; // ---------------- Awaitables ---------------- struct AsyncAccept { int listen_fd; bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<> h) const { EventLoop::instance().arm(listen_fd, EPOLLIN, h); } int await_resume() const { sockaddr_in addr{}; socklen_t len = sizeof(addr); return ::accept4(listen_fd, (sockaddr*)&addr, &len, SOCK_NONBLOCK | SOCK_CLOEXEC); } }; struct AsyncConnect { int fd; bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<> h) const { EventLoop::instance().arm(fd, EPOLLOUT, h); } int await_resume() const { int err = 0; socklen_t len = sizeof(err); ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); return err; } }; struct AsyncRead { int fd; char* buf; size_t len; bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<> h) const { EventLoop::instance().arm(fd, EPOLLIN, h); } ssize_t await_resume() const { return ::read(fd, buf, len); } }; struct AsyncWrite { int fd; const char* buf; size_t len; bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle<> h) const { EventLoop::instance().arm(fd, EPOLLOUT, h); } ssize_t await_resume() const { return ::write(fd, buf, len); } }; // ---------------- Socket helpers ---------------- inline int make_listener(uint16_t port) { int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (fd < 0) throw std::runtime_error("socket failed"); int yes = 1; ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(port); if (::bind(fd, (sockaddr*)&addr, sizeof(addr)) < 0) { ::close(fd); throw std::runtime_error(std::string("bind: ") + ::strerror(errno)); } if (::listen(fd, 128) < 0) { ::close(fd); throw std::runtime_error(std::string("listen: ") + ::strerror(errno)); } return fd; } inline int make_connector(const std::string& ip, uint16_t port) { int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (fd < 0) throw std::runtime_error("socket failed"); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(port); if (::inet_pton(AF_INET, ip.c_str(), &addr.sin_addr) <= 0) { ::close(fd); throw std::runtime_error("invalid ip: " + ip); } int r = ::connect(fd, (sockaddr*)&addr, sizeof(addr)); if (r < 0 && errno != EINPROGRESS) { ::close(fd); throw std::runtime_error(std::string("connect: ") + ::strerror(errno)); } return fd; } } // namespace corocoro_tcp_demo/server.cpp
// server.cpp — echo server using coro_net.hpp // g++ -std=c++20 -O2 server.cpp -o server // ./server [port] #include "coro_net.hpp" using namespace coro; // One coroutine per connection. Echoes bytes back until peer closes. static DetachedTask session(int fd) { char buf[1024]; for (;;) { ssize_t n = co_await AsyncRead{fd, buf, sizeof(buf)}; if (n == 0) break; // EOF if (n < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) continue; break; // error } // Write all bytes back size_t off = 0; while (off < (size_t)n) { ssize_t w = co_await AsyncWrite{fd, buf + off, (size_t)n - off}; if (w < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) continue; if (w <= 0) goto done; // error or EOF off += (size_t)w; } } done: ::close(fd); std::cout << "[server] conn " << fd << " closed\n"; } // Acceptor loop: each accepted fd spawns a session() coroutine. static DetachedTask server(uint16_t port) { int lfd = make_listener(port); std::cout << "[server] listening on :" << port << " (fd=" << lfd << ")\n"; for (;;) { int cfd = co_await AsyncAccept{lfd}; if (cfd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) continue; std::cerr << "[server] accept: " << ::strerror(errno) << "\n"; continue; } std::cout << "[server] accepted fd=" << cfd << "\n"; session(cfd); // fire-and-forget: runs until first co_await, then returns } } int main(int argc, char** argv) { ::signal(SIGPIPE, SIG_IGN); // writes to closed peer should return EPIPE, not kill us uint16_t port = (uint16_t)(argc > 1 ? std::atoi(argv[1]) : 12345); server(port); // DetachedTask starts immediately, suspends at first await EventLoop::instance().run(); // drives everything from here on return 0; }coro_tcp_demo/client.cpp
// client.cpp — echo client using coro_net.hpp // g++ -std=c++20 -O2 client.cpp -o client // ./client [ip] [port] #include "coro_net.hpp" using namespace coro; // Coroutine: async connect, then loop stdin -> send -> recv -> print. // NOTE: std::getline on stdin is blocking; for a single-connection client // this is fine. To be fully async on stdin too, register STDIN_FILENO // with the EventLoop via AsyncRead and a non-blocking termios setup. static DetachedTask run_client(const std::string& ip, uint16_t port) { int fd = make_connector(ip, port); int err = co_await AsyncConnect{fd}; if (err != 0) { std::cerr << "[client] connect failed: " << ::strerror(err) << "\n"; ::close(fd); co_return; } std::cout << "[client] connected to " << ip << ":" << port << " (fd=" << fd << ")\n" << "type lines, Ctrl-D to quit\n"; std::string line; while (std::getline(std::cin, line)) { line += "\n"; // Send all bytes const char* p = line.data(); size_t left = line.size(); while (left > 0) { ssize_t w = co_await AsyncWrite{fd, p, left}; if (w < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) continue; if (w <= 0) { std::cerr << "[client] write error\n"; goto out; } p += w; left -= (size_t)w; } // Read echo char buf[1024]; ssize_t n = co_await AsyncRead{fd, buf, sizeof(buf)}; if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) continue; if (n <= 0) { std::cerr << "[client] server closed\n"; goto out; } std::cout << "echo: " << std::string(buf, (size_t)n); } out: ::close(fd); std::cout << "[client] bye\n"; } int main(int argc, char** argv) { ::signal(SIGPIPE, SIG_IGN); std::string ip = argc > 1 ? argv[1] : "127.0.0.1"; uint16_t port = (uint16_t)(argc > 2 ? std::atoi(argv[2]) : 12345); run_client(ip, port); EventLoop::instance().run(); return 0; }coro_tcp_demo/Makefile
CXX ?= g++ CXXFLAGS ?= -std=c++20 -O2 -Wall -Wextra all: server client server: server.cpp coro_net.hpp $(CXX) $(CXXFLAGS) server.cpp -o server client: client.cpp coro_net.hpp $(CXX) $(CXXFLAGS) client.cpp -o client clean: rm -f server client .PHONY: all clean