Redis中如何使用消息队列

70次阅读
没有评论

共计 2326 个字符,预计需要花费 6 分钟才能阅读完成。

这篇文章主要介绍了 Redis 中如何使用消息队列,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让丸趣 TV 小编带着大家一起了解一下。

说到消息队列中间件,我们都会想到 RabbitMQ、RocketMQ 和 Kafka,来给应用实现异步消息传递的功能。这些都是专业的消息队列中间件,其特性之多超出了我们的理解能力。

而这些消息中间件使用起来的是复杂的,例如 RabbitMQ,发消息之前要创建 Exchange,还要创建 Queue,然后将 Exchange 和 Queue 通过某种规则绑定起来,发送消息的时候还要制定 routing-key,还要 控制头消息。这仅是生产者,消费者在消费消息之前也要将上面一系列的繁琐步骤再操作一遍。

那么对于那些并不要求百分百可靠,并且希望实现简单的消息队列需求时,我们可以通过 Redis 将我们从消息队列的中间件的繁琐步骤中解脱出来。

Redis 的消息队列不是专业的消息队列,他并没有消息队列中许多的高级特性,也没有 ack 保证。如果对消息的可靠性有着极致的追求,请转向专业的 MQ 中间件。

异步消息队列

从最简单的异步消息队列开始,Redis 的 list 数据结构常用来作为异步消息队列,通过 lrpush/lpush 来操作入列,通过 rpop/lpop 来出列。

问题一:空队列

对于 pop 操作来说,当消息队列空了的时候,客户端会陷入 pop 的死循环,造成大量的浪费生命的空轮询,导致客户端 CPU 拉高,同时 Redis 的 QPS 也被拉高。

对于以上问题的解决办法就是通过 list 结构的 blpop/brpop 来操作出列,其中 b 前缀代表的就是 blocking,阻塞读。对于阻塞读在队列没有数据的时候就会进入休眠状态,一旦数据到来就会立刻醒来。完美的解决了上面这个问题。

问题二:空闲连接断开

阻塞读的方案看似完美,紧接着引出了另外一个问题:空闲连接。如果线程一直阻塞在哪哪里,Redis 的客户端连接就变成了空闲连接。空闲时间过长,Redis 服务器就会主动断开连接,以减少闲置资源占用。这时候 blpop/brpop 就会抛出异常来。

所以,我们在编写客户端(应用程序)消费者的时候需要小心,注意捕获异常,并进行重试。

应用一:延时队列

在 Redis 的分布式锁中一般有三种策略来处理加锁失败的情况:

直接抛出异常,前端提醒用户是否要继续操作;

sleep 一会再重试;

将请求放到延时队列中,一会再重试;

而 Redis 中延时队列,我们可以通过 zset(有序列表)数据结构来实现。我们将消息序列化作为一个字符串作为 zse 的 value,而消息的到期处理时间(延时时间)作为 score。然后通过轮询 zset 获取到期时间进行处理,通过 zrem 将 key 从 zset 移除代表成功消费,进而处理任务。

核心代码如下:

//  生产 \
public void delay(T msg) {\
 TaskItem task = new TaskItem();\
 task.id = UUID.randomUUID().toString(); //  分配唯一的  uuid\
 task.msg = msg;\
 String s = JSON.toJSONString(task); // fastjson  序列化 \
 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); //  塞入延时队列  ,5s  后再试 \
//  消费 \
public void loop() {\
 while (!Thread.interrupted()) {\
 // zrangeByScore 参数中 0, System.currentTimeMills() 代表从 redis 中去 score 范围在 0 到系统当前时间的数据, 0,1 表示从 0 开始取 1 个   拓展传入的 score 为 -inf, +inf  分别表示 zset 中的最大值和最小值,当你不知道 zset 中的 score 最值时就可以使用 inf 作为参数变量 \
 Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\
 if (values.isEmpty()) {\
 try {\
 Thread.sleep(500); //  歇会继续 \
 }\
 catch (InterruptedException e) {\
 break;\
 }\
 continue;\
 }\
 String s = values.iterator().next(); // 消费队列 \
 if (jedis.zrem(queueKey, s)   0) { //  抢到了,要考虑到多线程下锁争抢的情况,只有 rem 成功代表成功的消费了一条消息。\
 TaskItem task = JSON.parseObject(s, TaskType); // fastjson  反序列化 \
 this.handleMsg(task.msg);\
 }\
}

以上的代码在多线程中对于同一个任务被多个线程争抢的情况,虽然能够通过 zrem 后在处理任务来避免一个任务被多次消费的情况。但是对于那些获取到了任务但是没有成功消费的线程来说,都是白白浪费时间取了一次任务。所以可以考虑通过 lua scripting 来优化这个逻辑。将 zrangeByScore 和 zrem 一同挪到服务器进行原子操作,就能够完美解决了。

感谢你能够认真阅读完这篇文章,希望丸趣 TV 小编分享的“Redis 中如何使用消息队列”这篇文章对大家有帮助,同时也希望大家多多支持丸趣 TV,关注丸趣 TV 行业资讯频道,更多相关知识等着你来学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-07-15发表,共计2326字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)