当前位置: 首页 > news >正文

NET 8 封装自己的 rabbtMQ

项目地址

https://github.com/sansantang/Jonckers.RabbitMQ.HttpApi.Order
1 支持自定义 QoS (默认 PrefetchSize = 0, PrefetchCount = 1, Global = false)
2 支持死信队列

怎么使用

1. 服务注册

appsettings.json

{"RabbitMQConnection": {"HostName": "192.168.49.151", // RabbitMQ 主机"Port": 5672, // 端口(默认5672)"UserName": "admin", // 用户名(默认guest)"Password": "admin123", // 密码(默认guest)"VirtualHost": "/", // 虚拟主机(默认/)"ExchangeName": "DefaultExchange", // 默认交换机"RetryCount": 3, // 重试次数"ConnectionTimeout": 10 // 连接超时时间(秒)}
}

在Program.cs中有以下代码:

builder.Services.AddMyRabbitMQ(builder.Configuration);
//注册消费者
//builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());
//...
app.UseMyEventHandler();

2. 生产者

image
image

using Jonckers.RabbitMQ.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace Jonckers.RabbitMQ.Service.ConsumerMessageModel
{[RabbitMQEvent(queue: "jonckers.enterpriseordering.requestevent")]public class PerryTest{public Guid Id { get; set; }public string? Name { get; set; }public int Count { get; set; }public string? Remark { get; set; }}
}

WeatherForecastController中:

public IMyPublisher<PerryTest> TestPublisher { get; }public WeatherForecastController(ILogger<WeatherForecastController> logger, IMyPublisher<PerryTest> testPublisher)
{_logger = logger;TestPublisher = testPublisher;
}

发送消息

当TestAsync方法被调用时:

[HttpGet("test")]
public async Task<string> TestAsync()
{var data = new PerryTest(){Id = Guid.NewGuid(),Name = "AAA",Count = 123,Remark = "测试一下"};await TestPublisher.PublishAsync(data);return "发送了一条消息";
}

此时使用的TestPublisher就是通过上述过程创建的MyPublisher<PerryTest>实例,该实例是通过第2个构造函数初始化的。

3. 消费者

注册消费者

builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());

创建消费者

using Jonckers.RabbitMQ.Core.Service;
using Jonckers.RabbitMQ.Service.ConsumerMessageModel;namespace Jonckers.RabbitMQ.HttpApi.Order.Consumer
{public class PerryTestEventHandler : MyEventHandler<PerryTest>{public override Task OnReceivedAsync(PerryTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}}
}

配置 Qos 参数

参数名 类型 含义 推荐值 说明
prefetchSize ushort 每次预取消息的总大小限制(字节) 0 (不限制) 一般用不到,设为 0 表示不限制消息大小
prefetchCount ushort 每次预取的消息数量上限(未确认的消息数) 1 ~ N(根据业务调整) 最关键参数!控制未 Ack 消息数
global bool 是否应用到该连接上的所有消费者 false (推荐) 如果为 true ,则对所有消费者生效;一般设为 false ,针对每个消费者单独设置
public class PerryTestEventHandler : MyEventHandler<PerryTest>
{public PerryTestEventHandler(){// 配置 QoS 参数Options.PrefetchSize = 0;Options.PrefetchCount = 2;    // 每次处理2条消息Options.Global = false;}public override Task OnReceivedAsync(PerryTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}
}

image

死信队列

过期或拒绝到死信队列

1 注册

启用 isWithDeadLetter = ture, 才能设置过期时间 expirationMilliseconds

using Jonckers.RabbitMQ.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace Jonckers.RabbitMQ.Service.ConsumerMessageModel
{[RabbitMQEvent(queue: "jonckers.enterpriseordering.deadletter", routingkey: "jonckers.enterpriseordering.deadletter", isWithDeadLetter: true, expirationMilliseconds: 60000)]public class DeadLetterTest{public Guid Id { get; set; }public string? Name { get; set; }public int Count { get; set; }public string? Remark { get; set; }}
}

2 生产者

image

发送消息 PublishWithDeadLetterAsync

[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{public IMyPublisher<DeadLetterTest> TestPublisher { get; }private readonly ILogger<WeatherForecastController> _logger;public WeatherForecastController(ILogger<WeatherForecastController> logger, IMyPublisher<DeadLetterTest> testPublisher){_logger = logger;TestPublisher = testPublisher;}[HttpGet("test")]public async Task<string> TestAsync(){var data = new DeadLetterTest(){Id = Guid.NewGuid(),Name = "AAA",Count = 123,Remark = "哈哈哈"};await TestPublisher.PublishAsync(data);return "发送了一个消息";}
}

3 消费者:

1 注册消费者

// 注册到services
builder.Services.AddMyRabbitMQEventHandlers(typeof(DeadLetterTestEventHandler).Assembly);

2 监听正常的消费者

using Jonckers.RabbitMQ.Core.Service;
using Jonckers.RabbitMQ.Service.ConsumerMessageModel;namespace Jonckers.RabbitMQ.HttpApi.Order.Consumer
{public class DeadLetterTestEventHandler : MyEventHandler<DeadLetterTest>{public override Task OnReceivedAsync(DeadLetterTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}}
}

3 监听死信
需要自己再监听一个死信队列,注意命名

var deadLetterExchangeName = _exchangeName + ".dlx-exchange";
var deadLetterQueueName = _queueName + ".dlx-queue";
var deadLetterRoutingKey = _routingKeyName + ".dlrk-routingKey";

参考

https://gitee.com/wosperry/wosperry-rabbit-mqtest

http://www.jsqmd.com/news/48387/

相关文章:

  • dropMimeData
  • Terrorform-自动化创建EKS集群
  • 最长单词2
  • Django 学习路线图 - 教程
  • Tefrorform-自动化创建IAM
  • 积极想到二维数组的递推
  • [人工智能-大模型-55]:模型层技能 - AI的算法、数据结构中算法、逻辑处理的算法异同
  • Terrorform-自动化配置AWS EC2
  • Terrorform-自动化配置AWS Route53
  • elasticSearch之API:索引运行
  • 20232406 2025-2026-1 《网络与系统攻防技术》 实验六实验报告
  • Monit-基于非容器服务自恢复程序实践
  • 人工智能之编程进阶 Python高级:第十章 知识点总结
  • 这篇题为《手指沾满白河水:AI元人文的批判与建构》的论文
  • 《手指沾满白河水:AI元人文的批判与建构》
  • 让你的动画“活”过来:Manim 节奏控制指南 (Rate Functions)
  • 《沉默的审查:高度原创性理论在预印本平台中的识别困境与范式危机——以“AI元人文”投稿为例》
  • 人工智能之编程进阶 Python高级:第九章 爬虫类模块
  • iOS虚拟现实开发如何降低成本
  • iOS虚拟现实开发如何提升性能
  • 2025专业防水补漏公司推荐——尤卉防水,连锁企业,上海、青岛、沧州、沈阳等多城市首选品牌,品质优、时效快,住宅补漏,工建检修,防水全流程服务。
  • 基于Prometheus-实现AWS EC2的实例异常自动重启
  • 2025 年 11 月伺服压力机厂家权威推荐榜:苏州小型电动精密四柱 C 型电缸节能智能高精度电子伺服油压机液压热压装机专业解析
  • 2025 年 11 月电热管厂家权威推荐榜:不锈钢/单头/空气干烧/浸入式/分流板/热流板/翅片/铁氟龙/工业电热管,高效耐用精准控温
  • PostgreSQL数据库技术革新与AI功能解析
  • ios基于linux还是unix
  • 2025加湿器水泵品牌TOP5推荐,宠物饮水机水泵、冷风扇水泵、水暖毯水泵等微型水泵源头厂商品质性价比选择指南
  • HarmonyOS Canvas开发指南 - 指南
  • 深入解析:基于python的化妆品销售分析系统
  • LeetCode 面试经典 150_链表_反转链表 II(60_92_C++_中等)(头插法) - 教程