zmq源码分析之io_thread_t
文章目录
- 概述
- 继承关系
- 核心成员
- 构造函数
- 启动与停止
- 启动
- 停止
- 事件处理
- 读事件处理(核心)
- 其他事件(理论上不会被调用)
- 停止处理
- 架构图
- 事件循环流程
- 与其他组件的关系
- 线程创建流程
- 关键设计点
- 命令处理类型
- 性能特点
- 总结
概述
io_thread_t是 ZeroMQ 中的I/O线程,负责处理网络事件和命令传递。它是 ZeroMQ 多线程架构的核心组件,运行独立的事件循环来监听和处理各种文件描述符上的I/O事件。io_thread_t通过poller_t间接的持有线程,同时io_thread_t自身也持有了一个邮箱,用于接收外部命令,邮箱本身也是个文件描述符,io_thread_t自身的io事件也交给了poller_t管理。
继承关系
classio_thread_tZMQ_FINAL:publicobject_t,publici_poll_events- object_t:提供命令处理能力和对象标识
- i_poll_events:提供I/O事件回调接口
核心成员
private:// I/O线程通过这个邮箱访问传入的命令mailbox_t _mailbox;// 与邮箱文件描述符关联的句柄poller_t::handle_t _mailbox_handle;// I/O多路复用使用poller对象实现poller_t*_poller;构造函数
zmq::io_thread_t::io_thread_t(ctx_t*ctx_,uint32_ttid_):object_t(ctx_,tid_),_mailbox_handle(static_cast<poller_t::handle_t>(NULL)){// 创建poller对象_poller=new(std::nothrow)poller_t(*ctx_);alloc_assert(_poller);// 将邮箱的文件描述符注册到pollerif(_mailbox.get_fd()!=retired_fd){_mailbox_handle=_poller->add_fd(_mailbox.get_fd(),this);_poller->set_pollin(_mailbox_handle);}}关键操作:
- 创建平台特定的poller(epoll/kqueue/poll/select)
- 将邮箱的文件描述符注册到poller
- 设置监听读事件(POLLIN)
启动与停止
启动
voidzmq::io_thread_t::start(){charname[16]="";snprintf(name,sizeof(name),"IO/%u",get_tid()-zmq::ctx_t::reaper_tid-1);// 启动底层的I/O线程_poller->start(name);}停止
voidzmq::io_thread_t::stop(){send_stop();}事件处理
读事件处理(核心)
voidzmq::io_thread_t::in_event(){// 从邮箱接收命令并处理command_t cmd;intrc=_mailbox.recv(&cmd,0);while(rc==0||errno==EINTR){if(rc==0)cmd.destination->process_command(cmd);rc=_mailbox.recv(&cmd,0);}errno_assert(rc!=0&&errno==EAGAIN);}工作流程:
- 从邮箱接收命令
- 循环处理所有可用命令
- 直到邮箱为空(返回 EAGAIN)
其他事件(理论上不会被调用)
voidzmq::io_thread_t::out_event(){// 永远不会在这里轮询POLLOUTzmq_assert(false);}voidzmq::io_thread_t::timer_event(int){// 这里没有定时器,永远不会被调用zmq_assert(false);}停止处理
voidzmq::io_thread_t::process_stop(){zmq_assert(_mailbox_handle);_poller->rm_fd(_mailbox_handle);_poller->stop();}架构图
┌─────────────────────────────────────────────────────────────────────┐ │ io_thread_t │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ object_t (基类) │ │ │ │ - ctx_t * 上下文指针 │ │ │ │ - uint32_t tid 线程ID │ │ │ │ - process_command() 命令处理分发 │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ i_poll_events (接口) │ │ │ │ + in_event() 文件描述符可读回调 │ │ │ │ + out_event() 文件描述符可写回调 │ │ │ │ + timer_event() 定时器到期回调 │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ mailbox_t │ │ poller_t │ │mailbox_handle│ │ │ │ │ │ (命令队列) │ │ (事件循环) │ │ (fd句柄) │ │ │ │ │ └──────┬──────┘ └──────┬──────┘ └─────────────┘ │ │ │ │ │ │ │ │ │ └─────────┼──────────────────┼───────────────────────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 物理线程 (Poller 事件循环) │ │ │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ poll/epoll/kqueue/select │ │ │ │ │ │ │ │ │ │ │ │ │ └──> in_event() ──> 处理命令 ──> process_command() │ │ │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘事件循环流程
┌─────────────────────────────────────────────────────────────────────┐ │ IO 线程事件循环 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────┐ │ │ │ Poller 等待 │ │ │ │ 事件 │ │ │ └───────┬───────┘ │ │ │ │ │ │ 邮箱fd可读 │ │ ▼ │ │ ┌───────────────┐ │ │ │ 调用in_event() │ │ │ └───────┬───────┘ │ │ │ │ │ ▼ │ │ ┌───────────────┐ ┌───────────────┐ │ │ │ mailbox.recv()│────>│ 有命令 │──── 处理命令 │ │ └───────┬───────┘ └───────────────┘ │ │ │ │ │ │ EAGAIN (无命令) │ │ ▼ │ │ ┌───────────────┐ │ │ │ 返回继续等待 │ │ │ └───────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘与其他组件的关系
┌─────────────────────────────────────────────────────────────────────┐ │ ctx_t (上下文) │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ _io_threads: IO线程池 │ │ │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ │ │IO[0] │ │IO[1] │ │IO[2] │ ... │ │ │ │ └───────┘ └───────┘ └───────┘ │ │ │ │ │ │ │ │ │ │ └───────┼─────────┼─────────┼─────────────────────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ Slot 数组 │ │ │ │ [term] [reaper] [IO[0]] [IO[1]] [IO[2]] [socket1] ... │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ Session/Engine │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 网络连接 (TCP/UDP/IPC/PGM) │ │ │ │ │ │ │ │ │ │ 注册到 IO 线程的 Poller │ │ │ │ ▼ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ 当网络数据可读/可写时,Poller 回调 io_thread_t │ │ │ │ │ │ → in_event() / out_event() │ │ │ │ │ │ → 处理网络数据 │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ Socket (应用线程) │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 用户 API 调用 (send/recv/connect/bind) │ │ │ │ │ │ │ │ │ │ 发送命令到 IO 线程的 mailbox │ │ │ │ ▼ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ mailbox.send(command) │ │ │ │ │ │ → _signaler.send() (如果需要) │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘线程创建流程
上下文创建时:
- 创建 IO 线程数组
_io_threads - 为每个 IO 线程分配 slot
- 创建 IO 线程数组
IO 线程构造:
- 创建 mailbox
- 创建 poller
- 注册 mailbox fd 到 poller
IO 线程启动:
- 调用
start()启动 poller - Poller 创建物理线程
- 线程运行事件循环
- 调用
关键设计点
| 设计点 | 说明 |
|---|---|
| 独立线程 | 每个 IO 线程运行在独立物理线程上 |
| 事件驱动 | 基于 poll/epoll/kqueue/select |
| 命令处理 | 通过 mailbox 接收命令 |
| 多路复用 | 一个 poller 监听多个 fd |
| 负载均衡 | ctx 选择负载最小的 IO 线程 |
命令处理类型
IO 线程可以处理多种命令:
stop- 停止 IO 线程plug- 注册到 IO 线程bind- 绑定 pipe 到 socketactivate_read/write- pipe 流量控制hiccup- pipe 中断处理- 等等…
性能特点
- 高效事件处理:基于内核级 I/O 多路复用
- 低延迟:事件驱动,无轮询开销
- 可扩展:支持多个 IO 线程
- 线程安全:每个 IO 线程独立运行
总结
io_thread_t是 ZeroMQ 多线程架构的核心组件:
- 独立运行:每个 IO 线程运行在独立物理线程上
- 事件驱动:通过 poller 监听文件描述符
- 命令通道:通过 mailbox 接收来自其他线程的命令
- 网络处理:负责处理所有网络 I/O 事件
- 负载均衡:ctx 选择最适合的 IO 线程处理连接
这是 ZeroMQ 能够高效处理高并发连接的关键机制!
