当前位置: 首页 > news >正文

testserver.cc测试例子解读

一、介绍

这个测试例子是基于muduo-core 实现的 Echo 服务器(客户端发什么,服务器原样返回),核心遵循「Reactor 模型(Multi-Reactor)」:

  • mainLoop(主线程 EventLoop)负责监听新连接(Acceptor);
  • subLoop(线程池中的 EventLoop)负责处理已连接客户端的读写事件;
  • 所有回调通过「函数对象 + 智能指针」解耦,事件驱动核心是EventLoop::loop()的 epoll_wait 循环。

二、每一行发生了什么

1、EventLoop loop;

  • 初始化epoll_fd(EpollPoller),用于监听事件;
  • 初始化wakeup_fd(eventfd),用于线程间唤醒(subLoop 唤醒 mainLoop,或反之);
  • 标记threadId_为当前主线程 ID(保证 One Loop Per Thread)

2、InetAddress addr(8080);

运行效果:初始化服务端监听地址,默认 IP 为127.0.0.1,端口 8080(网络字节序)。

  • 调用InetAddress(uint16_t port, std::string ip = "127.0.0.1")构造函数;
  • 填充sockaddr_in结构体:sin_family=AF_INETsin_port=htons(8080)sin_addr=inet_addr("127.0.0.1")

3、EchoServer server(&loop, addr, "EchoServer");

运行效果:初始化 EchoServer 对象,核心做 3 件事:

  1. 初始化TcpServer对象(关联 mainLoop、监听地址、服务器名称);
  2. 注册连接回调(onConnection)到TcpServer;
  3. 注册消息回调(onMessage)到 TcpServer;
  4. 设置 subLoop 线程数为 3(Multi-Reactor 模式)。

子步骤 1:TcpServer 初始化(EchoServer 构造函数server_(loop, addr, name)

调用TcpServer构造函数(TcpServer.cc 18-35 行):

  • 校验 mainLoop 非空;
  • 初始化Acceptor对象(负责监听新连接,绑定 mainLoop);
  • 初始化EventLoopThreadPool(线程池,关联 mainLoop);
  • Acceptor注册newConnection回调(TcpServer::newConnection)这是新连接的核心入口。

子步骤 2:注册连接回调(onConnection)

子步骤 3:注册消息回调(onMessage)

子步骤4:设置subLoop线程数

4、server.start();

运行效果:启动服务器,核心做 2 件事:

  1. 启动EventLoopThreadPool(创建 3 个 subLoop 线程);
  2. Acceptor开始监听 8080 端口(mainLoop 监听新连接)。

子步骤 1:启动线程池

关联代码

void EventLoopThreadPool::start(const ThreadInitCallback &cb) { running_ = true; for (int i = 0; i < numThreads_; ++i) { // 创建线程,运行threadFunc std::string name = name_ + std::to_string(i + 1); threads_.emplace_back(new Thread( std::bind(&EventLoopThreadPool::threadFunc, this, cb), name)); threads_[i]->start(); } } // 线程函数:每个线程创建一个EventLoop,进入loop循环 void EventLoopThreadPool::threadFunc(const ThreadInitCallback &cb) { EventLoop loop; if (cb) cb(&loop); { MutexLockGuard lock(mutex_); loops_.push_back(&loop); cond_.notifyAll(); } loop.loop(); // subLoop进入事件循环 }

子步骤 2:Acceptor 开始监听(Acceptor::listen)

5、loop.loop();

运行效果:mainLoop 进入事件循环(Reactor 核心),永久循环直到quit()被调用:

  1. 调用Poller::poll()(epoll_wait),阻塞等待事件(新连接 / 唤醒事件);
  2. 遍历触发的事件列表,调用对应 Channel 的handleEvent()
  3. 执行pendingFunctors_(跨线程待执行的函数,比如 newConnection)。
void EventLoop::loop() { looping_ = true; quit_ = false; while (!quit_) { activeChannels_.clear(); // 1. epoll_wait阻塞,返回触发的事件(activeChannels_) pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // 2. 处理所有触发的Channel事件 for (Channel *channel : activeChannels_) { channel->handleEvent(pollReturnTime_); } // 3. 执行跨线程的待执行函数 doPendingFunctors(); } looping_ = false; }

三、onMessage 回调逐行解析(核心业务逻辑)

当客户端发送数据时,会触发onMessage回调,先看回调的触发前置条件,再逐行解析:

前置:onMessage 的触发链路(从客户端发数据到回调执行)

  1. 客户端调用send()发数据 → 服务端 8080 对应的客户端 fd 触发 EPOLLIN;
  2. 该 fd 所属的 subLoop 的 Poller(epoll_wait)检测到事件,返回activeChannels_
  3. 调用Channel::handleEvent()→ 触发TcpConnection::handleRead()
  4. handleRead()从 fd 读取数据到inputBuffer_,调用messageCallback_(即 onMessage)。
// 1. Channel处理读事件(Channel.cc) void Channel::handleEventWithGuard(Timestamp receiveTime) { if (revents_ & (EPOLLIN | EPOLLPRI)) { if (readCallback_) readCallback_(receiveTime); // 调用TcpConnection::handleRead } } // 2. TcpConnection读取数据(TcpConnection.cc) void TcpConnection::handleRead(Timestamp receiveTime) { int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); // 读数据到inputBuffer_ if (n > 0) { // 调用消息回调(即EchoServer::onMessage) messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } }

std::string msg = buf->retrieveAllAsString();

conn->send(msg);

// 1. TcpConnection::send(TcpConnection.cc) void TcpConnection::send(const std::string &buf) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { // 若在subLoop线程,直接发 sendInLoop(buf.c_str(), buf.size()); } else { // 跨线程,放入pendingFunctors_ loop_->runInLoop( std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size())); } } } // 2. 核心发送逻辑(TcpConnection.cc) void TcpConnection::sendInLoop(const void *data, size_t len) { ssize_t nwrote = 0; size_t remaining = len; bool faultError = false; // 情况1:无待发送数据,直接write if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = ::write(channel_->fd(), data, len); if (nwrote >= 0) { remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_) { // 发送完成,触发回调 loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } } } // 情况2:write未发完,剩余数据存入outputBuffer_,注册EPOLLOUT if (!faultError && remaining > 0) { size_t oldLen = outputBuffer_.readableBytes(); // 高水位回调(超过64M触发) if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } outputBuffer_.append((char *)data + nwrote, remaining); // 存入缓冲区 if (!channel_->isWriting()) { channel_->enableWriting(); // 注册EPOLLOUT事件 } } } // 3. EPOLLOUT触发时,发送剩余数据(TcpConnection.cc) void TcpConnection::handleWrite() { if (channel_->isWriting()) { ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno); // 写数据 if (n > 0) { outputBuffer_.retrieve(n); // 清空已发送数据 if (outputBuffer_.readableBytes() == 0) { channel_->disableWriting(); // 取消EPOLLOUT if (writeCompleteCallback_) { loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); } } } } }

// conn->shutdown();

// 1. 关闭写端(TcpConnection.cc) void TcpConnection::shutdown() { if (state_ == kConnected) { setState(kDisconnecting); loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this)); } } void TcpConnection::shutdownInLoop() { if (!channel_->isWriting()) { // 发送缓冲区已空 socket_->shutdownWrite(); // 关闭写端 } } // 2. 处理EPOLLHUP(Channel.cc) void Channel::handleEventWithGuard(Timestamp receiveTime) { if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) { if (closeCallback_) closeCallback_(); // 调用TcpConnection::handleClose } } // 3. 销毁连接(TcpConnection.cc) void TcpConnection::handleClose() { setState(kDisconnected); channel_->disableAll(); TcpConnectionPtr connPtr(shared_from_this()); connectionCallback_(connPtr); // 触发onConnection(Connection DOWN) closeCallback_(connPtr); // 调用TcpServer::removeConnection }

四、回调实现的核心逻辑(从注册到触发)

整个 muduo 的回调体系基于「函数对象(std::function)+ 绑定(std::bind)+ 传递」,核心步骤:

1. 回调注册(testserver.cc → TcpServer → TcpConnection)

  • testserver.cc:将onConnection/onMessage绑定为函数对象,通过TcpServer::setXxxCallback存入 TcpServer;
  • TcpServer::newConnection:创建 TcpConnection 时,将 TcpServer 的回调传递给 TcpConnection;
  • TcpConnection:将回调存入自身成员(connectionCallback_/messageCallback_),并关联到 Channel 的事件处理。

2. 回调触发(事件驱动)

  • 连接事件:TcpConnection 创建 / 销毁时,调用connectionCallback_(onConnection);
  • 消息事件:客户端发数据触发读事件,调用messageCallback_(onMessage);
  • 所有回调最终由EventLoop的事件循环驱动,保证在对应线程(mainLoop/subLoop)执行。

五、总结

这个测试例子的核心是Reactor 模型的落地

  1. mainLoop监听新连接,subLoop处理读写,实现多线程并发;
  2. 回调通过「注册 - 传递 - 触发」解耦业务逻辑和底层网络;
  3. Buffer解决 TCP 粘包 / 拆包,异步发送避免线程阻塞;
  4. 整个流程基于 epoll 的事件驱动,所有操作异步非阻塞,符合高性能网络库的设计思想。

每一行代码的执行都联动了 muduo-core 的核心类:

  • EventLoop是事件循环核心;
  • TcpServer是对外封装的服务器入口;
  • Acceptor是新连接监听者;
  • TcpConnection是单个连接的封装;
  • Channel是 fd 和事件的桥梁;
  • Buffer是数据缓冲区,解决读写效率问题。

回调的本质是「将业务函数以函数对象的形式传递给底层,底层在特定事件触发时调用」,而 muduo 通过std::functionstd::bind实现了回调的灵活绑定,通过shared_ptr保证对象生命周期安全。

http://www.jsqmd.com/news/632207/

相关文章:

  • 手把手教你解决PyTorch的nn、optim模块导入失败:从环境配置到文件命名的避坑全指南
  • 嵌入式无锁任务队列:裸机与RTOS下的零内存分配串行化方案
  • SITS2026多语言支持白皮书核心解密(覆盖197种语言的Tokenization重构工程)
  • MelonLoader完整教程:5分钟学会Unity游戏模组加载终极方案
  • ESP32/ESP8266轻量Toggl时间条目API客户端
  • qemu虚拟机复制
  • 【GUI-Agent】阶跃星辰 GUI-MCP 解读---()---执行层链
  • 告别阻塞!用 PHP TrueAsync 实现 PHP 脚本提速 倍矢
  • Rails 7中的表单验证与错误处理
  • PHP源码是否依赖特定芯片组_Intel与AMD平台差异【操作】
  • ROS2深度相机标定实战:从驱动配置到结果应用全解析
  • 用 AI Coding 工具生成 万字奇幻世界设定的实践记录惩
  • 深入解析C++ I/O流控制标志:ios_base与ios命名空间下的模式对比
  • 高德定位电子围栏Kotlin实现
  • 2026年4月打卡机产品推荐,打卡机产品博锐诚信务实提供高性价比服务 - 品牌推荐师
  • 选股小龙虾智能选股系统-2026.4.12.13 版本完整技术报告(修订版)
  • jQuery元素遍历与条件检测
  • Qt打印功能实战:5分钟搞定QPrintDialog配置与常见问题排查
  • 维纶触摸屏程序项目:威纶通界面UI应用,适用于EB Pro6.00及以上版本,IP与IE系列屏...
  • 玩一玩微软的 bit 模型:BitNet. 一个 CPU 就能跑起来的大模型欠
  • TCLB(CUDA Lattice Boltzmann)项目介绍
  • 网页开发四剑客:HTML/CSS/JS/PHP全解析
  • 2026鄂破专用固定式机械臂标杆名录:液压固定式破碎锤、矿业破碎锤、破碎生产线固定式机械臂、破碎生产线固定式破碎锤选择指南 - 优质品牌商家
  • BMH08101血氧心率模块UART协议解析与嵌入式集成
  • Windows10 ARM64 QtWidget工程构建:从环境配置到交叉编译实战
  • 看2026年4月电线电缆布局公司推荐,选优质企业不迷路,矿用电缆/线缆/电线电缆/防火电缆,电线电缆制造厂家有哪些 - 品牌推荐师
  • 【UE组件解析】从功能到渲染:Actor、Scene与Primitive组件的核心差异与应用场景
  • 分库分表专题
  • 一个简洁易用的 Delphi JSON 封装库,基于 System.JSON`单元封装,提供更直观的 API文
  • 【AI Agent实战】OpenClaw Skill 技能系统详解:从 Function Calling 到 MCP 到 Skill 的完整演进