简单使用php+redis实现消息队列推送

来源:赵克立博客 分类: PHP 标签:消息队列MQNoSQLRedis发布时间:2019-05-16 15:20:05最后更新:2019-12-03 11:36:05浏览:718
版权声明:
本文为博主原创文章,转载请声明原文链接...谢谢。o_0。
温馨提示:
技术类文章有它的时效性,请留意文章更新时间以及软件的版本
更新时间:
2019-12-03 11:36:05

前言

php+redis 实现发布订阅功能

如果想用redis实现,订单队列等关键性东西,建议不要用redis。因为持久化,可靠性,不够。即使你能做到,也会很“吃力”。建议用RabbitMQ,redis实现的队列一般用于统计播放量、搜索热度、点击率、访客数量记录、等准备率不需要太高的场景。

实现原理

消息推送实现方法php+redis,压入一个消息,然后服务器自动提取出来发送出去,redis有个列表结构可以实现这个功能,重要的是它有个方法可以从列表中取数据,这个方法可以设置阻塞(超时)获取,如果列表中没有消息则可以等待,实现代码如下

压入消息

$redis = new redis();
$redis->connect('127.0.0.1', 6379) or die('connect error!');
$redis->select(8);
$redis->lpush('msglist', rand(1000, 9999));

服务器取出并通知

ini_set('default_socket_timeout', -1);
$redis = new redis();
$redis->pconnect('127.0.0.1', 6379) or die('connect error!');
$redis->select(8);
while (true) {
    //等100秒
    var_dump($redis->blpop('msglist', 100));
}

注意上面服务器连接用的是长链接,并且设置啦socket的超时时间为永不超时,效果如下

image.png

使用enqueue/redis实现

安装

有轮子用当然直接用喽,安装enqueue/redis,文档地址 https://php-enqueue.github.io/transport/redis/

composer require enqueue/redis

创建上下文

连接redis并且创建一个上下文件对象

use Enqueue\Redis\RedisConnectionFactory;
//包含自动加载类
$loader  = require __DIR__ . '/../vendor/autoload.php';
$factory = new RedisConnectionFactory([
    'host'              => '127.0.0.1',
    'port'              => 6379,
    'scheme_extensions' => ['phpredis'],
]);
$context = $factory->createContext();

生产消息

$fooQueue = $context->createQueue('redismq:aQueue');
$message  = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);

生产一个定时过期消息

下面消息60后会自动从队列中删除

$message = $context->createMessage('Hello world!');
$context->createProducer()
    ->setTimeToLive(60000) // 60 sec
    ->send($fooQueue, $message);

生产一个延期消息

5秒之后消息能会被加入队列

$message = $context->createMessage('Hello world!');
$context->createProducer()
    ->setDeliveryDelay(5000) // 5 sec
    ->send($fooQueue, $message);

消费消息

$fooQueue = $context->createQueue('redismq:aQueue');
$consumer = $context->createConsumer($fooQueue);
while (true) {
    //接收消息 默认是5000毫秒超时,这个地方的超时会影响延期消息的处理不能设置太大,请注意下
    $message = $consumer->receive();
    if ($message) {
        $body = $message->getBody();
        echo $body;
        //...业务处理
        // 确认消费并删除消息,如果不执行下面则此条消息会被标识为已经接收,但并不会删除
        $consumer->acknowledge($message);
        //$consumer->reject($message);
    }
}

删除队列(清空消息)

$context->deleteQueue($fooQueue);

redis保存的数据结构如下

1910251571991338803278.png


微信号:kelicom QQ群:215861553 紧急求助须知
点击更换验证码
Win32/PHP/JS/Android/Python