PHPPHP与消息队列RabbitMQ集成
PHP与消息队列RabbitMQ集成
RabbitMQ是流行的消息中间件。PHP通过AMQP扩展或php-amqplib库连接RabbitMQ。今天说说PHP与RabbitMQ的集成。
连接RabbitMQ。
```php
// composer require php-amqplib/php-amqplib
require 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
echo "已连接到RabbitMQ\n";
$channel->close();
$connection->close();
?>
发送消息到队列。
```php
function sendToQueue(string $queueName, array $data): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare($queueName, false, true, false, false);
$message = new AMQPMessage(json_encode($data), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
$channel->basic_publish($message, '', $queueName);
echo "消息已发送到队列: $queueName\n";
$channel->close();
$connection->close();
}
sendToQueue('task_queue', ['task' => 'send_email', 'to' => 'user@example.com']);
?>
消费消息。
```php
function consumeQueue(string $queueName, callable $handler): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare($queueName, false, true, false, false);
echo "等待消息...\n";
$channel->basic_consume($queueName, '', false, false, false, false, function ($msg) use ($handler) {
echo "收到消息\n";
$data = json_decode($msg->body, true);
try {
$handler($data);
$msg->ack();
echo "处理完成\n";
} catch (Exception $e) {
echo "处理失败: {$e->getMessage()}\n";
$msg->nack(true);
}
});
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
consumeQueue('task_queue', function ($data) {
echo "处理: {$data['task']}\n";
});
?>
发布订阅模式。
```php
// 发布者
function publish(string $exchangeName, array $data): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$message = new AMQPMessage(json_encode($data));
$channel->basic_publish($message, $exchangeName);
echo "已发布到交换器: $exchangeName\n";
$channel->close();
$connection->close();
}
// 订阅者
function subscribe(string $exchangeName, string $queueName, callable $handler): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$channel->queue_declare($queueName, false, false, false, false);
$channel->queue_bind($queueName, $exchangeName);
$channel->basic_consume($queueName, '', false, true, false, false, function ($msg) use ($handler) {
$handler(json_decode($msg->body, true));
});
while ($channel->is_consuming()) {
$channel->wait();
}
}
?>
延迟队列的实现。
```php
function sendDelayed(string $queueName, array $data, int $delayMs): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$message = new AMQPMessage(json_encode($data), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable([
'x-delay' => $delayMs,
]),
]);
$channel->basic_publish($message, '', $queueName);
echo "延迟消息已发送 ({$delayMs}ms后执行)\n";
}
?>
RabbitMQ是功能完整的消息队列系统。支持多种消息模式、消息持久化、ACK确认、延迟队列。PHP通过php-amqplib库可以方便地集成RabbitMQ,适合需要可靠消息传递的场景。
