近几天在基于阿里云 AMQP
版消息队列开发业务,之所以选择这个而不是阿里云主推的 RocketMQ
,是因为我想用的是 RabbitMQ
。选择阿里云而非自己部署,无非是为了节省运维成本。相对来说官方的 PHP
版本 Demo
非常简陋,而且虽然官方宣传完全兼容 RabbitMQ
,但还是有一些坑的。
官方介绍:消息队列 AMQP
版由阿里云基于 AMQP
标准协议自研,完全兼容 RabbitMQ
开源生态以及多语言客户端,打造分布式、高吞吐、低延迟、高可扩展的云消息服务。开箱即用,用户无需部署免运维,轻松实现快速上云,阿里云提供全托管服务,更专业、更可靠、更安全。
本文主要涉及的是关于 延迟消息 这一高级特性的实现。
按照阿里云官方的介绍,只要在消息头里设置 delay
属性即可。按照官方的 Demo
的确可以测试通过,但是官方的 Demo
是未定义 Exchange
直接向队列发送消息,而且队列未设置任何的属性。实际业务肯定要复杂一点,定义有 Exchange
,而且队列设置有 x-message-ttl
及 对应的死信队列。由于各种尝试延迟功能无法实现,就在阿里云发起了工单进行咨询,结果工单回复了数次也没有解决。最终,还是经过多次尝试,发现问题出在队列的 x-message-ttl
属性上。也就是说,队列不定义这个消息存活时间,则可以正常实现延迟消息功能,具体原因还待进一步分析。
基于官方的简陋 Demo
对生产端及消费端做了一些完善,以下代码可以测试通过。
官方 Demo
地址:amqp-php-demo
消费端(receiveWithProp.php
)代码:
/**
* 消费端|只包含具体逻辑部分
* receiveWithProp.php
*/
try {
// 创建连接
$connectionUtil = new ConnectionUtil($host, $port, $virtualHost, $accessKey, $accessSecret, $instanceId);
$connection = $connectionUtil->getConnection();
// 创建信道
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare('exchange', 'direct', false, true, false, false, false);
// 声明队列
$channel->queue_declare('queue', false, true, false, false, false, new AMQPTable([
// 'x-message-ttl' => 43200000, // 定义这个就测试失败
'x-dead-letter-exchange' => 'exchange',
'x-dead-letter-routing-key' => 'deadLetter.key'
]));
// 绑定队列
$channel->queue_bind('queue', 'exchange', 'key');
// 声明死信队列
$channel->queue_declare('deadLetter.queue', false, true, false, false, false);
// 绑定死信队列
$channel->queue_bind('deadLetter.queue', 'exchange', 'deadLetter.key');
//
echo " [*] Waiting for messages. To exit press CTRL+C\n";
// 回调函数
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
$headers = $msg->get('application_headers');
echo ' [x] Received ', $headers->getNativeData()['delay'], "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
// 消费消息
$channel->basic_consume('queue', '', false, false, false, false, $callback);
// 阻塞等待
while (count($channel->callbacks)) {
$channel->wait();
}
// 关闭连接
$channel->close();
$connection->close();
} catch (Exception $e) {
//
echo $e->getMessage(), PHP_EOL;
}
生产端(sendWithProp.php
)代码:
/**
* 生产端|只包含具体逻辑部分
*/
try {
// 创建连接
$connectionUtil = new ConnectionUtil($host, $port, $virtualHost, $accessKey, $accessSecret, $instanceId);
$connection = $connectionUtil->getConnection();
// 创建信道
$channel = $connection->channel();
// 消息内容
$msgBody = json_encode(['message'=>'blog.phpha.com']);
// 消息头|延迟5秒
$amqpTable = new AMQPTable(['delay'=>5000]);
// 创建消息
$msg = new AMQPMessage($msgBody, [
// 设置消息头
'application_headers' => $amqpTable,
// 永久存储
'delivery_mode' => 2
]);
// 发布消息|通过交换机
$channel->basic_publish($msg, 'exchange', 'key');
echo " [x] 已发布\n";
// 关闭连接
$channel->close();
$connection->close();
} catch (Exception $e) {
//
echo $e->getMessage(), PHP_EOL;
}