redis如何实现队列的阻塞、延时、发布和订阅

67次阅读
没有评论

共计 6657 个字符,预计需要花费 17 分钟才能阅读完成。

这篇文章主要介绍了 redis 如何实现队列的阻塞、延时、发布和订阅的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇 redis 如何实现队列的阻塞、延时、发布和订阅文章都会有所收获,下面我们一起来看看吧。

Redis 不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

由于 Redis 的列表是使用双向链表实现的,保存了头节点和尾节点,所以在列表的头部和尾部两边插入或获取元素都是非常快的,时间复杂度为 O(1)。

普通队列

可以直接使用 Redis 的 list 数据类型实现消息队列,只需简单的两个指令 lpush 和 rpop 或者 rpush 和 lpop。

lpush+rpop:左进右出的队列

rpush+lpop:左出右进的队列

下面使用 redis 的命令来模拟普通队列。
使用 lpush 命令生产消息:

lpush queue:single 1 1 lpush queue:single 2 2 lpush queue:single 3 3

使用 rpop 命令消费消息:

rpop queue:single 1 rpop queue:single 2 rpop queue:single 3

下面使用 Java 代码来实现普通队列。

生产者 SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 *  生产者
 */public class SingleProducer {
 public static final String SINGLE_QUEUE_NAME =  queue:single 
 public static void main(String[] args) { Jedis jedis = new Jedis();
 for (int i = 0; i   100; i++) { jedis.lpush(SINGLE_QUEUE_NAME,  hello   + i);
 }
 jedis.close();
 }}

消费者 SingleConsumer:

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 *  消费者
 */public class SingleConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis();
 while (true) { String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
 if(Objects.nonNull(message)) { System.out.println(message);
 } else { TimeUnit.MILLISECONDS.sleep(500);
 }
 }
 }}

上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:

消费者需要不停的调用 rpop 方法查看 redis 的 list 中是否有待处理的数据(消息)。每调用一次都会发起一次连接,有可能 list 中没有数据,造成大量的空轮询,导致造成不必要的浪费。也许你可以使用 Thread.sleep() 等方法让消费者线程隔一段时间再消费,如果睡眠时间过长,这样不能处理一些时效性要求高的消息,睡眠时间过短,也会在连接上造成比较大的开销。

如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

阻塞队列

消费者可以使用 brpop 指令从 redis 的 list 中获取数据,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回 null,于是消费端就不需要休眠后获取数据了,这样就相当于实现了一个阻塞队列,

使用 redis 的 brpop 命令来模拟阻塞队列。

brpop queue:single 30

可以看到命令行阻塞在了 brpop 这里了,30s 后没数据就返回。

Java 代码实现如下:

生产者与普通队列的生产者一致。

消费者 BlockConsumer:

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 *  消费者
 */public class BlockConsumer { public static void main(String[] args) { Jedis jedis = new Jedis();
 while (true) {
 //  超时时间为 1s
 List String  messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
 if (null != messageList   !messageList.isEmpty()) { System.out.println(messageList);
 }
 }
 }}

缺点:无法实现一次生产多次消费。

发布订阅模式

Redis 除了对消息队列提供支持外,还提供了一组命令用于支持发布 / 订阅模式。利用 Redis 的 pub/sub 模式可以实现一次生产多次消费的队列。

发布:PUBLISH 指令可用于发布一条消息,格式:

PUBLISH channel message

返回值表示订阅了该消息的数量。

订阅:SUBSCRIBE 指令用于接收一条消息,格式:

SUBSCRIBE channel

使用 SUBSCRIBE 指令后进入了订阅模式,但是不会接收到订阅之前 publish 发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。

回复分为三种类型:

如果为 subscribe,第二个值表示订阅的频道,第三个值表示是已订阅的频道的数量

如果为 message(消息),第二个值为产生该消息的频道,第三个值为消息

如果为 unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。

下面使用 redis 的命令来模拟发布订阅模式。

生产者:

127.0.0.1:6379  publish queue hello(integer) 1127.0.0.1:6379  publish queue hi(integer) 1

消费者:

127.0.0.1:6379  subscribe queue
Reading messages... (press Ctrl-C to quit)1)  subscribe 2)  queue 3) (integer) 11)  message 2)  queue 3)  hello 1)  message 2)  queue 3)  hi

Java 代码实现如下:

生产者 PubsubProducer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 *  生产者
 */public class PubsubProducer {
 public static final String PUBSUB_QUEUE_NAME =  queue:pubsub 
 public static void main(String[] args) { Jedis jedis = new Jedis();
 for (int i = 0; i   100; i++) { jedis.publish(PUBSUB_QUEUE_NAME,  hello   + i);
 }
 jedis.close();
 }}

消费者 PubsubConsumer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 *  消费者
 */public class PubsubConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis();
 JedisPubSub jedisPubSub = new JedisPubSub() {
 @Override
 public void onMessage(String channel, String message) { System.out.println( receive message:   + message);
 if(message.indexOf( 99)   -1) { this.unsubscribe();
 }
 }
 @Override
 public void onSubscribe(String channel, int subscribedChannels) { System.out.println( subscribe channel:   + channel);
 }
 @Override
 public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println( unsubscribe channel   + channel);
 }
 };
 jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
 }}

消费者可以启动多个,每个消费者都能收到所有的消息。

可以使用指令 UNSUBSCRIBE 退订,如果不加参数,则会退订所有由 SUBSCRIBE 指令订阅的频道。

Redis 还支持基于通配符的消息订阅,使用指令 PSUBSCRIBE (pattern subscribe),例如:

psubscribe channel.*

用 PSUBSCRIBE 指令订阅的频道也要使用指令 PUNSUBSCRIBE 指令退订,该指令无法退订 SUBSCRIBE 订阅的频道,同理 UNSUBSCRIBE 也不能退订 PSUBSCRIBE 指令订阅的频道。

同时 PUNSUBSCRIBE 指令通配符不会展开。例如:PUNSUBSCRIBE \* 不会匹配到 channel.\*,所以要取消订阅 channel.\* 就要这样写 PUBSUBSCRIBE channel.\*。

Redis 的 pub/sub 也有其缺点,那就是如果消费者下线,生产者的消息会丢失。

延时队列和优先级队列

Redis 中有个数据类型叫 Zset,其本质就是在数据类型 Set 的基础上加了个排序的功能而已,除了保存原始的数据 value 之外,还提供另一个属性 score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset 会自动重新按新的 score 值进行排序。

如果 score 字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果 score 字段代表的是消息想要执行时间的时间戳,将它插入 Zset 集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的 score 就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个 Zset 集合,以免造成性能浪费。

下面使用 redis 的 zset 来模拟延时队列。

生产者:

127.0.0.1:6379  zadd queue:delay 1 order1 2 order2 3 order3(integer) 0

消费者:

127.0.0.1:6379  zrange queue:delay 0 0 withscores1)  order1 2)  1 127.0.0.1:6379  zrem queue:delay order1(integer) 1

Java 代码如下:

生产者 DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 *  生产者
 */public class DelayProducer {
 public static final String DELAY_QUEUE_NAME =  queue:delay 
 public static void main(String[] args) { Jedis jedis = new Jedis();
 long now = new Date().getTime();
 Random random = new Random();
 for (int i = 0; i   10; i++) { int second = random.nextInt(30); //  随机订单失效时间
 jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000,  order +i);
 }
 jedis.close();
 }}

消费者:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 *  消费者
 */public class DelayConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis();
 while (true) { long now = new Date().getTime();
 Set Tuple  tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
 if(tupleSet.isEmpty()) { TimeUnit.MILLISECONDS.sleep(500);
 } else { for (Tuple tuple : tupleSet) { Double score = tuple.getScore();
 long time = score.longValue();
 if(time   now) { jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
 System.out.println(order[  + tuple.getElement() + ] is timeout at   + time);
 } else { TimeUnit.MILLISECONDS.sleep(500);
 }
 break;
 }
 }
 }
 }}

应用场景

延时队列可用于订单超时失效的场景

二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。

关于“redis 如何实现队列的阻塞、延时、发布和订阅”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“redis 如何实现队列的阻塞、延时、发布和订阅”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-07-13发表,共计6657字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)