Redis中如何实现消息队列和延时消息队列

86次阅读
没有评论

共计 2458 个字符,预计需要花费 7 分钟才能阅读完成。

这篇文章将为大家详细讲解有关 Redis 中如何实现消息队列和延时消息队列,丸趣 TV 小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

list 的几个命令

lpush (left push)

由队列的左边存放进去

rpush (right push)

由队列的右边存放进去

lpop  (left pop)

由队列的左边取出来

rpop (right pop)

由队列的右边取出来

以上的四个命令,可以让 list 帮我们实现队列 或者 栈,队列的特性是先进先出,栈的特性是先进后出,

所以队列的实现可以使用 lpush + rpop 或者 rpush + lpop,

栈的实现则是 lpush + lpop 或者 rpush + rpop。

使用命令演示队列

生产者发布消息

首先我们使用 rpush 对一个叫做 notify-queue 的队列,增加五个元素,即 1 2 3 4 5,也就是作为生产者发布消息啦

消费者消费消息

既然生产者使用的是 rpush,那么消费者就要用 lpop,可以看下下图,我们不停对 notify-queue 进行消息消费,并且是按照顺序的,从 1 一直到 5,按顺序读出,最终队列中没有消息了,弹出的则一直是空

空轮询问题

在上面使用 lpop 消费消息时,可以看到,消息消费完后,我们每次再去 pop 时,读到的都是一个空的信息,

上面是手动执行命令,但是如果是写好的代码程序不停的去 pop 数据 (拉取数据) 的话,会造成空轮询(无用的读取),

既拉高了客户端的 CPU 消耗,又拉高了 redis 的 QPS,并且还是无用操作,这些无用操作可能会造成其他客户端对 redis 的访问变得响应缓慢。

解决方案 A (休眠)

既然空轮询会让客户端和 redis 的资源消耗都会变得较高,那么我们可以让客户端在收到空数据的时候,进行 1s 的休眠,1s 后再进行数据拉取,这样可以降低消耗

Thread.sleep(1000)

这个方案也是存在瑕疵的,即消息消费延迟性增大了,如果只有一个消费者的话,延迟就是 1s,即空轮询后,正好休眠了,但是这时候刚好有消息过来了,还是要等到 1s 醒来后才能消费,

如果有多个消费者的话,由于每个消费者的睡眠时间是岔开的,会降低一些延迟性,但是有没有办法更好的方法,可以做到几乎 0 延迟?

解决方案 B (阻塞读)

redis 中关于队列取数据其实还有两个命令,即阻塞读取,

blpop (blocking left pop)

brpop (blocking right pop)

阻塞读在队列没有数据的时候,会进入休眠状态,一旦有消息来了以后,则立刻做出反应,读取数据,因此使用 blpop/brpop 替换 lpop/rpop 则可以解决消息延迟性的问题,

继续入队 3 个属性,6、7、8

使用 blpop 进行队列的读取,最后一个参数是阻塞读的等待时间,如果超过这个时间还没有消息,将会返回 nil,此时可以继续重复 blpop 操作,

阻塞读的空闲连接自动断开问题

客户端使用阻塞读时,如果阻塞的时间过长,服务一般会当成空闲连接,从而对其进行主动断开,减少无用的连接占用资源,这个时候客户端会抛出异常,

所以请注意,在客户端使用阻塞读的时候,要进行异常的捕获,从而做出相应的处理,例如重试。

java 客户端实现消息队列

思路和上述一样,只是由命令行客户端 redis-cli 变成了 java 语言,一个线程或多个线程进行 rpush 的发布,

另外一个或多个线程进行 blpop 消费,完成的代码在:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue

发布者

订阅者

延时队列的实现思路

延时队列指的是,消息发送的一段时间后,再由消费者进行消费,而不是发送过去后,消费者就能立即读取到,

zset 的可以帮我们做到这个事情,首先 zset 可以通过 score 进行排序,score 可以存一个时间戳,所以我们每次发布消息的时候,用当前时间戳加上延时的时间戳,

随后消费者取消息的时候,通过截取 zset 的数据,取到已经满足当前时间的消息(即取 score 小于等于当前时间戳的数据,score 小于等于当前时间戳代表消息已经到时间了,如果大于的话,说明还得等一会才能消费)。

关键命令 zadd (发布者),zrangebyscore(订阅者),zrem (订阅者消费完数据后删除)

命令实现

我们使用 zadd 添加了 4 个数据,分别是 1、2、3 秒 (伪说法,这里其实只是个 score) 后才能消费的数据,还有一个 10 秒后才能消费的 kafka,

假如现在已经到了第三秒,我们取 zset 中大于等于 1 秒的和小于等于 3 秒的数据,因为这个区间的数据正好是我们可以消费的,可以看到,我们取出了符合条件的 3 条数据,

如果每次只能消费一个数据的话,可以加一个 limit 限制条件,可以看下图取出了第一个可以消费的数据,redis

同时注意,和 list 的 lpop/ 和 blpop 不同(它们弹出即自动删除原始队列里的该数据),

虽然获取到了数据,但是如果不使用 zrem 进行删除的话,这条数据还会被其他人读到,因为他还一直存在 zset 中,

不过 zrem 可能会发生已经被别人抢先一步删除 (消费) 的情况,所以代码中还需要根据 zrem 的返回值是否大于 0 判断,本次消息我们是否抢占成功,成功后再进行正确消费。

代码实现

发布者

Redis 中如何实现消息队列和延时消息队列

订阅者

Redis 中如何实现消息队列和延时消息队列

测试延迟效果

Redis 中如何实现消息队列和延时消息队列

完整代码地址:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-delayed-queue

优化, 使用 lua 实现

上面实现的延迟队列中,有一个问题,就是使用 zrem 判断是否抢到这个数据时,很有可能没有抢到,这样继续进行读取,可能几轮都抢不到,资源白白浪费了,所以可以通过 lua 脚本,来进行优化,

让 zrangebyscore 和 zrem 成为一个原子化操作,这就可以避免多线程争抢,抢不到的资源浪费了。

Redis 中如何实现消息队列和延时消息队列

Redis 中如何实现消息队列和延时消息队列

关于“Redis 中如何实现消息队列和延时消息队列”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-07-17发表,共计2458字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)