当前位置: 首页 > news >正文

Chap22-DistributedLock_MultiServer

Chap22-DistributedLock_MultiServer

前文我们实现了单服务器踢人的逻辑,通过分布式锁锁住登录过程,在这个期间对用户相关的信息进行更改,主要包括用户id对应的serverip, sessionid等。

同时对用户离线消息进行了处理,也是通过分布式锁锁住退出过程,判断此时用户id对应的sessionid是否和本服记录相等,如果不相等则说明有用户异地登录,此时只要退出即可,否则要清理id对应的sessionid以及serverip等信息。

接下来我们实现跨服踢人逻辑

RPC封装

因为涉及到了跨服务器,我们需要再添加新的grpc服务

message KickUserReq{int32 uid = 1;
}message KickUserRsp{int32 error = 1;int32 uid = 2;
}

添加新的客户端调用和实现

/*** @brief 通知剔除用户** @param server_ip* @param req* @return NotifyFriendOnlineResponse*/KickUserRsp NotifyKickUser(std::string server_ip, const KickUserReq &req);KickUserRsp ChatGrpcClient::NotifyKickUser(std::string server_ip, const KickUserReq &req)
{KickUserRsp rsp;rsp.set_error(static_cast<int>(ErrorCodes::RPCFAILED));Defer defer([&rsp, &req]() {rsp.set_uid(req.uid());});auto it = _pool.find(server_ip);if (it == _pool.end()) {return rsp;}auto& pool = it->second;grpc::ClientContext context;auto stub = pool->GetConnection();Defer defer2([&pool, &stub]() {pool->ReturnConnection(std::move(stub));});Status status = stub->NotifyKickUser(&context, req, &rsp);if (!status.ok()) {return rsp;}rsp.set_error(static_cast<int>(ErrorCodes::SUCCESS));return rsp;
}

添加新的服务端实现

/*** @brief 通知踢人** @param context* @param request* @param response* @return Status*/Status NotifyKickUser(grpc::ServerContext*context,const KickUserReq*request,KickUserRsp*response) override;Status ChatGrpcServer::NotifyKickUser(grpc::ServerContext*context,const KickUserReq*request,KickUserRsp*response)
{auto uid = request->uid();auto session = UserManager::GetInstance()->GetSession(uid);Defer defer([response]() {response->set_error(static_cast<int>(ErrorCodes::SUCCESS));});// 不在内存中if (session == nullptr) {return Status::OK;}session->NotifyOffline(uid);_server->ClearSession(session->GetSessionId());return Status::OK;
}

跨服踢人示意图

image-20251201204736963

添加逻辑

上一节单服务器踢人的时候,并没有grpc调用,这里我们加上(LogicSystem.cpp::Login):

if (b_ip) {//获取当前服务器ip信息auto& cfg = ConfigMgr::Inst();auto self_name = cfg["SelfServer"]["Name"];//如果之前登录的服务器和当前相同,则直接在本服务器踢掉if (uid_ip_value == self_name) {//查找旧有的连接auto old_session = UserMgr::GetInstance()->GetSession(uid);//此处应该发送踢人消息if (old_session) {old_session->NotifyOffline(uid);//清除旧的连接_p_server->ClearSession(old_session->GetSessionId());}}else {//如果不是本服务器,则通知grpc通知其他服务器踢掉//发送通知KickUserReq kick_req;kick_req.set_uid(uid);ChatGrpcClient::GetInstance()->NotifyKickUser(uid_ip_value, kick_req);}
}

既然要踢人,那么肯定要修改/读取有关服务器连接的数量,因此我们在StatusServer中获取连接数量寻找压力最小的服务器的时候也需要加锁:

ChatServer StatusServiceImpl::getChatServer() {std::lock_guard<std::mutex> guard(_server_mtx);auto minServer = _servers.begin()->second;auto lock_key = LOCK_COUNT;auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);//利用defer解锁Defer defer2([this, identifier, lock_key]() {RedisMgr::GetInstance()->releaseLock(lock_key, identifier);});auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);if (count_str.empty()) {//不存在则默认设置为最大minServer.con_count = INT_MAX;}else {minServer.con_count = std::stoi(count_str);}// 使用范围基于for循环for ( auto& server : _servers) {if (server.second.name == minServer.name) {continue;}auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);if (count_str.empty()) {server.second.con_count = INT_MAX;}else {server.second.con_count = std::stoi(count_str);}if (server.second.con_count < minServer.con_count) {minServer = server.second;}}return minServer;
}

main

我们注意到在服务端处理踢人请求的时候使用了_server

session->NotifyOffline(uid);
_server->ClearSession(session->GetSessionId());

因此我们需要添加一个新的成员变量_server和成员函数SetServer,注册入这个ChatGrpcServer

就要求我们在main函数中进行设置:

#include "global/ConfigManager.h"
#include "global/const.h"
#include "grpc/ChatGrpcServer.h"
#include "redis/RedisManager.h"
#include "server/AsioPool.h"
#include "server/LogicSystem.h"
#include "server/Server.h"#include <boost/asio.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/beast/http/field.hpp>
#include <grpc++/grpc++.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server_builder.h>
#include <spdlog/spdlog.h>
#include <thread>int main()
{spdlog::set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%^%l%$] [%s:%#] %v");spdlog::set_level(spdlog::level::debug);auto& cfg = ConfigManager::GetInstance();auto server_name = cfg["SelfServer"]["name"];SPDLOG_INFO("开始创建 ChatGrpcServer 实例...");{RedisManager::GetInstance()->InitCount(server_name);Defer defer([server_name]{RedisManager::GetInstance()->DelCount(server_name);RedisManager::GetInstance()->Close();});std::string server_address = cfg["SelfServer"]["host"] + ":" + cfg["SelfServer"]["RPCPort"];SPDLOG_INFO("开始创建 ChatGrpcServer 实例...");ChatGrpcServer service;SPDLOG_INFO("ChatGrpcServer 实例创建完成,地址: {}", (void*)&service);grpc::ServerBuilder builder;builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());builder.RegisterService(&service);std::unique_ptr<grpc::Server> server(builder.BuildAndStart());SPDLOG_INFO("Grpc Server On: {}", server_address);std::thread grpc_server([&server]() {server->Wait();});auto pool = AsioPool::GetInstance();boost::asio::io_context ioc;auto port = cfg["SelfServer"]["port"];auto server_ptr = std::make_shared<Server>(ioc, std::stoi(port));LogicSystem::GetInstance()->SetServer(server_ptr);service.SetServer(server_ptr);boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);signals.async_wait([&ioc, pool, &server](const boost::system::error_code& /*error*/, int /*signal_number*/) {pool->Stop();ioc.stop();server->Shutdown();});server_ptr->Start();ioc.run();grpc_server.join();}return 0;
}

可以看到,我们的Server创建提前了,然后让service进行设置_server.

同时我们还封装了一下redis初始化和删除对应内容的方法,同时也要加入分布式锁。

void InitCount(const std::string& server_name);
void DelCount(const std::string& server_name);void RedisManager::InitCount(const std::string& server_name)
{auto lock_key = LOCK_COUNT;auto identifier = RedisManager::GetInstance()->AcquireLock(lock_key,LOCK_TIMEOUT,ACQUIRE_LOCK_TIMEOUT);Defer defer([identifier]{RedisManager::GetInstance()->ReleaseLock(LOCK_COUNT, identifier);});RedisManager::GetInstance()->HSet(LOGIN_COUNT_PREFIX, server_name, "0");
}void RedisManager::DelCount(const std::string& server_name)
{auto lock_key = LOCK_COUNT;auto identifier = RedisManager::GetInstance()->AcquireLock(lock_key,LOCK_TIMEOUT,ACQUIRE_LOCK_TIMEOUT);Defer defer([identifier]{RedisManager::GetInstance()->ReleaseLock(LOCK_COUNT, identifier);});RedisManager::GetInstance()->HDel(LOGIN_COUNT_PREFIX, server_name);
}
http://www.jsqmd.com/news/135659/

相关文章:

  • Asio09-SendQueueAndEndian
  • 第四章 SQL Server备份和还原
  • 5分钟使用modelengine打造儿童数字人,小白也能快速上手以低代码的方式快速搭建智能应用,从而大幅降低开发难度
  • 基于springboot在线课程管理系统的设计与实现毕业论文+PPT(附源代码+演示视频)
  • LLM - 用 SpecKit 和 AICode 改造遗留系统 完整实践指南
  • Elasticsearch数据膨胀?调优部署全攻略
  • 【计算机毕业设计案例】基于Java的停车场管理系统、预订车位系统、停车缴费(程序+文档+讲解+定制)
  • Router_路由的基本使用
  • 计算机Java毕设实战-基于Java的停车场管理系统【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 快速检查Ubuntu进程是否运行的3种方法
  • Java毕设项目:基于springboot的公司财务预算管理系统(源码+文档,讲解、调试运行,定制等)
  • 贝叶斯优化Transformer-LSTM的模型结构图
  • 番茄小说下载器 2025.12.21 | 现代化、高效的番茄小说下载器,支持批量下载和多种格式导出
  • 计算机Java毕设实战-基于SpringBoot的植物知识管理与分享平台的设计与实现家庭园艺种植分享平台设计与实现【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • Router_路由重定向和其他小细节问题
  • Java毕设项目:基于SpringBoot+Vue技术的医疗器械管理系统设计与实现(源码+文档,讲解、调试运行,定制等)
  • Redis 数据结构底层与 Hash 优于 JSON 的工程实践
  • STM32平衡车工具-匿名助手+虚拟串口如何使用。
  • 编码器测速思路,以及如何进行测速,速度调整
  • 从零开始学C++:STL简介
  • 【计算机毕业设计案例】基于springboot+vue技术的二手车交易管理系统的设计与实现(程序+文档+讲解+定制)
  • 别再“+”到天亮!String.format 一键拯救Java字符串拼接,高可读+可维护神操作
  • Router_编程式路由
  • 重装数次arch_linux有感
  • Java毕设选题推荐:基于springboot+vue技术的二手车交易管理系统的设计与实现汽车管理汽车品牌管理,公告类型管理,论坛管理【附源码、mysql、文档、调试+代码讲解+全bao等】
  • Java五种文件拷贝方式
  • 2-[(2-叠氮乙酰基)氨基]-2-脱氧-D-吡喃甘露糖—糖生物学与代谢标记的关键化学探针 1971934-97-0
  • 萤石开放平台 国标设备接入 |常见问题
  • 计算机Java毕设实战-基于springboot+vue技术的二手车交易管理系统的设计与实现基于SpringBoot+Vue的二手车交易平台设计【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 总结