Redis中如何实现限流策略

81次阅读
没有评论

共计 4693 个字符,预计需要花费 12 分钟才能阅读完成。

这篇文章将为大家详细讲解有关 Redis 中如何实现限流策略,丸趣 TV 小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

一、简单的限流基本原理

当系统处理能力有限,如何组织计划外的请求对系统施压。首先我们先看下一些简单的限流策略,防止暴力攻击。比如要对 IP 访问,没 5s 只能访问 10 次,超过进行拦截。

如上图,一般使用滑动窗口来统计区间时间内的访问次数。
使用 zset 记录 IP 访问次数,每个 IP 通过 key 保存下来,score 保存当前时间戳,value 唯一用时间戳或者 UUID 来实现

代码实现

public class RedisLimiterTest {
 private Jedis jedis;
 public RedisLimiterTest(Jedis jedis) {
 this.jedis = jedis;
 }
 /**
 * @param ipAddress Ip 地址
 * @param period  特定的时间内,单位秒
 * @param maxCount  最大允许的次数
 * @return
 */
 public boolean isIpLimit(String ipAddress, int period, int maxCount) { String key = String.format( ip:%s , ipAddress);
 //  毫秒时间戳
 long currentTimeMillis = System.currentTimeMillis();
 Pipeline pipe = jedis.pipelined();
 // redis 事务,保证原子性
 pipe.multi();
 //  存放数据,value  和  score  都使用毫秒时间戳
 pipe.zadd(key, currentTimeMillis,   + UUID.randomUUID());
 //  移除窗口区间所有的元素
 pipe.zremrangeByScore(key, 0, currentTimeMillis - period * 1000);
 //  获取时间窗口内的行为数量
 Response Long  count = pipe.zcard(key);
 //  设置  zset  过期时间,避免冷用户持续占用内存,这里宽限 1s
 pipe.expire(key, period + 1);
 //  提交事务
 pipe.exec();
 pipe.close();
 //  比较数量是否超标
 return count.get()   maxCount;
 }
 public static void main(String[] args) { Jedis jedis = new Jedis( localhost , 6379);
 RedisLimiterTest limiter = new RedisLimiterTest(jedis);
 for (int i = 1; i  = 20; i++) {
 //  验证 IP 10 秒钟之内只能访问 5 次
 boolean isLimit = limiter.isIpLimit(222.73.55.22 , 10, 5);
 System.out.println(访问第  + i +  次,  结果: + (isLimit ?  限制访问  :  允许访问));
 }
 }
}

执行结果

 访问第 1 次,  结果:允许访问
访问第 2 次,  结果:允许访问
访问第 3 次,  结果:允许访问
访问第 4 次,  结果:允许访问
访问第 5 次,  结果:允许访问
访问第 6 次,  结果:限制访问
访问第 7 次,  结果:限制访问
... ...

缺点:要记录时间窗口所有的行为记录,量很大,比如,限定 60s 内不能超过 100 万次这种场景,不太适合这样限流,因为会消耗大量的储存空间。

二、漏斗限流基本原理

漏斗的容量是限定的,如果满了,就装不进去了。

如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。

如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。

如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。

示例代码

public class FunnelLimiterTest {
 static class Funnel {
 int capacity; //  漏斗容量
 float leakingRate; //  漏嘴流水速率
 int leftQuota; //  漏斗剩余空间
 long leakingTs; //  上一次漏水时间
 public Funnel(int capacity, float leakingRate) {
 this.capacity = capacity;
 this.leakingRate = leakingRate;
 this.leftQuota = capacity;
 this.leakingTs = System.currentTimeMillis();
 }
 void makeSpace() { long nowTs = System.currentTimeMillis();
 long deltaTs = nowTs - leakingTs; //  距离上一次漏水过去了多久
 int deltaQuota = (int) (deltaTs * leakingRate); //  腾出的空间  =  时间 * 漏水速率
 if (deltaQuota   0) { //  间隔时间太长,整数数字过大溢出
 this.leftQuota = capacity;
 this.leakingTs = nowTs;
 return;
 }
 if (deltaQuota   1) { //  腾出空间太小   就等下次,最小单位是 1
 return;
 }
 this.leftQuota += deltaQuota; //  漏斗剩余空间  =  漏斗剩余空间  +  腾出的空间
 this.leakingTs = nowTs;
 if (this.leftQuota   this.capacity) { //  剩余空间不得高于容量
 this.leftQuota = this.capacity;
 }
 }
 boolean watering(int quota) { makeSpace();
 if (this.leftQuota  = quota) { //  判断剩余空间是否足够
 this.leftQuota -= quota;
 return true;
 }
 return false;
 }
 }
 //  所有的漏斗
 private Map String, Funnel  funnels = new HashMap ();
 /**
 * @param capacity  漏斗容量
 * @param leakingRate  漏嘴流水速率  quota/s
 */
 public boolean isIpLimit(String ipAddress, int capacity, float leakingRate) { String key = String.format( ip:%s , ipAddress);
 Funnel funnel = funnels.get(key);
 if (funnel == null) { funnel = new Funnel(capacity, leakingRate);
 funnels.put(key, funnel);
 }
 return !funnel.watering(1); //  需要 1 个 quota
 }
 public static void main(String[] args) throws Exception{ FunnelLimiterTest limiter = new FunnelLimiterTest();
 for (int i = 1; i  = 50; i++) {
 //  每 1s 执行一次
 Thread.sleep(1000);
 //  漏斗容量是 2  ,漏嘴流水速率是 0.5 每秒, boolean isLimit = limiter.isIpLimit(222.73.55.22 , 2, (float)0.5/1000);
 System.out.println(访问第  + i +  次,  结果: + (isLimit ?  限制访问  :  允许访问));
 }
 }
}

执行结果

 访问第 1 次,  结果:允许访问  #  第 1 次,容量剩余 2,执行后 1
访问第 2 次,  结果:允许访问  #  第 2 次,容量剩余 1,执行后 0
访问第 3 次,  结果:允许访问  #  第 3 次,由于过了 2s,  漏斗流水剩余 1 个空间,所以容量剩余 1,执行后 0
访问第 4 次,  结果:限制访问  #  第 4 次,过了 1s,  剩余空间小于 1,  容量剩余 0
访问第 5 次,  结果:允许访问  #  第 5 次,由于过了 2s,  漏斗流水剩余 1 个空间,所以容量剩余 1,执行后 0
访问第 6 次,  结果:限制访问  #  以此类推...
访问第 7 次,  结果:允许访问
访问第 8 次,  结果:限制访问
访问第 9 次,  结果:允许访问
访问第 10 次,  结果:限制访问 

我们观察 Funnel 对象的几个字段,我们发现可以将 Funnel 对象的内容按字段存储到一个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。

但是有个问题,我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。

如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择,我们该如何解决这个问题呢?Redis-Cell 救星来了!

Redis-Cell

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。
该模块只有 1 条指令 cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这个指令具体该如何使用。

 cl.throttle key:xxx 15 30 60 1

15 : 15 capacity 这是漏斗容量

30 60 : 30 operations / 60 seconds 这是漏水速率

1 : need 1 quota (可选参数,默认值也是 1)

 cl.throttle laoqian:reply 15 30 60
1) (integer) 0 # 0  表示允许,1 表示拒绝
2) (integer) 15 #  漏斗容量 capacity
3) (integer) 14 #  漏斗剩余空间 left_quota
4) (integer) -1 #  如果拒绝了,需要多长时间后再试 (漏斗有空间了,单位秒)
5) (integer) 2 #  多长时间后,漏斗完全空出来 (left_quota==capacity,单位秒)

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试。

关于“Redis 中如何实现限流策略”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-07-15发表,共计4693字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)