当前位置: 首页 > news >正文

PHPPHP与消息队列RabbitMQ集成

PHP与消息队列RabbitMQ集成

RabbitMQ是流行的消息中间件。PHP通过AMQP扩展或php-amqplib库连接RabbitMQ。今天说说PHP与RabbitMQ的集成。

连接RabbitMQ。

```php
// composer require php-amqplib/php-amqplib

require 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

echo "已连接到RabbitMQ\n";

$channel->close();
$connection->close();
?>

发送消息到队列。

```php
function sendToQueue(string $queueName, array $data): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare($queueName, false, true, false, false);

$message = new AMQPMessage(json_encode($data), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);

$channel->basic_publish($message, '', $queueName);
echo "消息已发送到队列: $queueName\n";

$channel->close();
$connection->close();
}

sendToQueue('task_queue', ['task' => 'send_email', 'to' => 'user@example.com']);
?>

消费消息。

```php
function consumeQueue(string $queueName, callable $handler): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare($queueName, false, true, false, false);
echo "等待消息...\n";

$channel->basic_consume($queueName, '', false, false, false, false, function ($msg) use ($handler) {
echo "收到消息\n";
$data = json_decode($msg->body, true);

try {
$handler($data);
$msg->ack();
echo "处理完成\n";
} catch (Exception $e) {
echo "处理失败: {$e->getMessage()}\n";
$msg->nack(true);
}
});

while ($channel->is_consuming()) {
$channel->wait();
}

$channel->close();
$connection->close();
}

consumeQueue('task_queue', function ($data) {
echo "处理: {$data['task']}\n";
});
?>

发布订阅模式。

```php
// 发布者
function publish(string $exchangeName, array $data): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$message = new AMQPMessage(json_encode($data));
$channel->basic_publish($message, $exchangeName);

echo "已发布到交换器: $exchangeName\n";

$channel->close();
$connection->close();
}

// 订阅者
function subscribe(string $exchangeName, string $queueName, callable $handler): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$channel->queue_declare($queueName, false, false, false, false);
$channel->queue_bind($queueName, $exchangeName);

$channel->basic_consume($queueName, '', false, true, false, false, function ($msg) use ($handler) {
$handler(json_decode($msg->body, true));
});

while ($channel->is_consuming()) {
$channel->wait();
}
}
?>

延迟队列的实现。

```php
function sendDelayed(string $queueName, array $data, int $delayMs): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$message = new AMQPMessage(json_encode($data), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable([
'x-delay' => $delayMs,
]),
]);

$channel->basic_publish($message, '', $queueName);
echo "延迟消息已发送 ({$delayMs}ms后执行)\n";
}
?>

RabbitMQ是功能完整的消息队列系统。支持多种消息模式、消息持久化、ACK确认、延迟队列。PHP通过php-amqplib库可以方便地集成RabbitMQ,适合需要可靠消息传递的场景。

http://www.jsqmd.com/news/959283/

相关文章:

  • OpenCode直逼20万star,开源AI编程王者的基础教程(含国产模型配置)
  • 保姆级教程:用PostgreSQL+PostGIS+GeoServer搞定OSM地图发布(附避坑指南)
  • PyQt5界面美化实战:从.qrc文件到炫酷背景,手把手教你玩转CSS样式
  • 从‘盲猜’到‘有理有据’:Armijo准则如何拯救你的优化算法不收敛?
  • SI5341时钟芯片配置避坑指南:如何用Verilog SPI驱动替代ClockBuilder Pro手动操作
  • 2026绵阳正规家政公司推荐榜 高效响应更贴心 - 优质品牌商家
  • 四川了无痕环保设备:移动厕所服务技术及联系推荐 - 优质品牌商家
  • 腾讯Xcheck实战:5分钟搞定Java Spring项目的代码安全扫描(附误报优化心得)
  • Foobar2000播放DSD512卡顿闪退?可能是你的插件组合和系统平台在‘打架’
  • 告别定位漂移:用Python+开源IGNav库,手把手实现你的第一个RTK/INS紧组合算法
  • ICEM CFD网格镜像实战:告别uncovered faces,5步搞定半模转全模
  • CubeIDE官方不支持DAP-Link?三步教你用OpenOCD“曲线救国”(以STM32F4为例)
  • 给TMS320F28377D做个‘心脏搭桥’:手把手教你配置双工程Bootloader的CMD文件
  • 告别卡尔曼滤波?用DETR的‘亲儿子’TrackFormer搞定多目标跟踪(附MOT17实战分析)
  • 2026年知名的迎宾机器人/人形机器人/机器人推荐厂家精选 - 品牌宣传支持者
  • 从智能车竞赛到DIY电源:固态电容如何解决我的大功率电路‘发烧’难题
  • Android与Linux的Ping命令差异全解析:从超时参数-W到-w,别再被网上教程误导了
  • 别再自己造轮子了!手把手教你用Cadence/Synopsys VIP加速SoC验证(附自研VIP开发避坑指南)
  • 从手机拍照到视频播放:一文搞懂Android相机默认的NV21格式(YUV420SP详解)
  • 别再瞎试了!用FFmpeg -buildconf 命令读懂编译选项,定制你的专属音视频工具链
  • 别再只用if-else了!用Python的异或运算符(^)让你的代码更简洁高效
  • 2026成都搬家服务评测:绿色老兵及同行服务对比 - 优质品牌商家
  • 别再为相似物料头疼了!SAP MM物料版次实战:用ECN+版次搞定变更,告别混乱
  • 油气管道石蜡沉积动态仿真工具:MATLAB GUI版,含温度/流速影响分析与可视化结果
  • PHP临时文件与缓存管理
  • 51单片机红外遥控控制图片轮播与蜂鸣器音乐播放(含数码管编号显示)
  • 告别黑屏!手把手教你用NodeMCU ESP8266点亮1.44寸ST7735屏幕(TFT_eSPI库配置避坑指南)
  • PHPGraphQL与RESTfulAPI对比
  • LIO-SAM保姆级调试笔记:从IMU标定到地图保存的完整避坑指南
  • 别只调学习率了!聊聊对比学习和知识蒸馏里那个神秘的‘温度’参数T