近几天在基于阿里云 AMQP 版消息队列开发业务,之所以选择这个而不是阿里云主推的 RocketMQ ,是因为我想用的是 RabbitMQ 。选择阿里云而非自己部署,无非是为了节省运维成本。相对来说官方的 PHP 版本 Demo 非常简陋,而且虽然官方宣传完全兼容 RabbitMQ ,但还是有一些坑的。

官方介绍:消息队列 AMQP 版由阿里云基于 AMQP 标准协议自研,完全兼容 RabbitMQ 开源生态以及多语言客户端,打造分布式、高吞吐、低延迟、高可扩展的云消息服务。开箱即用,用户无需部署免运维,轻松实现快速上云,阿里云提供全托管服务,更专业、更可靠、更安全。

本文主要涉及的是关于 延迟消息 这一高级特性的实现。

按照阿里云官方的介绍,只要在消息头里设置 delay 属性即可。按照官方的 Demo 的确可以测试通过,但是官方的 Demo 是未定义 Exchange 直接向队列发送消息,而且队列未设置任何的属性。实际业务肯定要复杂一点,定义有 Exchange ,而且队列设置有 x-message-ttl 及 对应的死信队列。由于各种尝试延迟功能无法实现,就在阿里云发起了工单进行咨询,结果工单回复了数次也没有解决。最终,还是经过多次尝试,发现问题出在队列的 x-message-ttl 属性上。也就是说,队列不定义这个消息存活时间,则可以正常实现延迟消息功能,具体原因还待进一步分析。

基于官方的简陋 Demo 对生产端及消费端做了一些完善,以下代码可以测试通过。

官方 Demo 地址:amqp-php-demo

消费端(receiveWithProp.php)代码:

/**
 * 消费端|只包含具体逻辑部分
 * receiveWithProp.php
 */
try {
    // 创建连接
    $connectionUtil = new ConnectionUtil($host, $port, $virtualHost, $accessKey, $accessSecret, $instanceId);
    $connection = $connectionUtil->getConnection();
    // 创建信道
    $channel = $connection->channel();
    // 声明交换机
    $channel->exchange_declare('exchange', 'direct', false, true, false, false, false);
    // 声明队列
    $channel->queue_declare('queue', false, true, false, false, false, new AMQPTable([
        // 'x-message-ttl' => 43200000, // 定义这个就测试失败
        'x-dead-letter-exchange' => 'exchange',
        'x-dead-letter-routing-key' => 'deadLetter.key'
    ]));
    // 绑定队列
    $channel->queue_bind('queue', 'exchange', 'key');
    // 声明死信队列
    $channel->queue_declare('deadLetter.queue', false, true, false, false, false);
    // 绑定死信队列
    $channel->queue_bind('deadLetter.queue', 'exchange', 'deadLetter.key');
    //
    echo " [*] Waiting for messages. To exit press CTRL+C\n";
    // 回调函数
    $callback = function ($msg) {
        echo ' [x] Received ', $msg->body, "\n";
        $headers = $msg->get('application_headers');
        echo ' [x] Received ', $headers->getNativeData()['delay'], "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    // 消费消息
    $channel->basic_consume('queue', '', false, false, false, false, $callback);
    // 阻塞等待
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    // 关闭连接
    $channel->close();
    $connection->close();
} catch (Exception $e) {
    //
    echo $e->getMessage(), PHP_EOL;
}

生产端(sendWithProp.php)代码:

/**
 * 生产端|只包含具体逻辑部分
 */
try {
    // 创建连接
    $connectionUtil = new ConnectionUtil($host, $port, $virtualHost, $accessKey, $accessSecret, $instanceId);
    $connection = $connectionUtil->getConnection();
    // 创建信道
    $channel = $connection->channel();
    // 消息内容
    $msgBody = json_encode(['message'=>'blog.phpha.com']);
    // 消息头|延迟5秒
    $amqpTable = new AMQPTable(['delay'=>5000]);
    // 创建消息
    $msg = new AMQPMessage($msgBody, [
        // 设置消息头
        'application_headers' => $amqpTable,
        // 永久存储
        'delivery_mode' => 2
    ]);
    // 发布消息|通过交换机
    $channel->basic_publish($msg, 'exchange', 'key');
    echo " [x] 已发布\n";
    // 关闭连接
    $channel->close();
    $connection->close();
} catch (Exception $e) {
        //
    echo $e->getMessage(), PHP_EOL;
}

标签:PHP 消息队列