共计 2893 个字符,预计需要花费 8 分钟才能阅读完成。
这篇文章给大家分享的是有关 redis 如何通过 pipeline 提升吞吐量的内容。丸趣 TV 小编觉得挺实用的,因此分享给大家做个参考,一起跟随丸趣 TV 小编过来看看吧。
案例目标
简单介绍 redis pipeline 的机制,结合一段实例说明 pipeline 在提升吞吐量方面发生的效用。
案例背景
应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元;
然而在某些服务中,数据操作对 redis 是强依赖的,在最近的一次分析中发现:
一次数据推送会对 redis 产生近 30 次读写操作!
在数据推送业务中的性能压测中,以数据上报 – 下发应答为一次事务;而对于这样的读写模型,redis 的操作过于频繁,很快便导致系统延时过高,吞吐量低下,无法满足目标;
优化过程 主要针对业务代码做的优化,其中 redis 操作经过大量合并,最终降低到原来的 1 /5,而系统吞吐量也提升明显。
其中,redis pipeline(管道机制) 的应用是一个关键手段。
pipeline 的解释
Pipeline 指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。
管道技术使用广泛,例如许多 POP3 协议已经实现支持这个功能,大大加快了从服务器下载新邮件的过程。Redis 很早就支持管道(pipeline)技术。(因此无论你运行的是什么版本,你都可以使用管道(pipelining)操作 Redis)
普通请求模型
[图 -pipeline1]
Pipeline 请求模型
[图 -pipeline2]
从两个图的对比中可看出,普通的请求模型是同步的,每次请求对应一次 IO 操作等待;
而 Pipeline 化之后所有的请求合并为一次 IO,除了时延可以降低之外,还能大幅度提升系统吞吐量。
代码实例
说明
本地开启 50 个线程,每个线程完成 1000 个 key 的写入,对比 pipeline 开启及不开启两种场景下的性能表现。
相关常量
// 并发任务
private static final int taskCount = 50;
// pipeline 大小
private static final int batchSize = 10;
// 每个任务处理命令数
private static final int cmdCount = 1000;
private static final boolean usePipeline = true;
初始化连接
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxActive(200);
poolConfig.setMaxIdle(100);
poolConfig.setMaxWait(2000);
poolConfig.setTestOnBorrow(false);
poolConfig.setTestOnReturn(false);
jedisPool = new JedisPool(poolConfig, host, port);
并发启动任务,统计执行时间
public static void main(String[] args) throws InterruptedException { init();
flushDB();
long t1 = System.currentTimeMillis();
ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i taskCount; i++) { executor.submit(new DemoTask(i, latch));
}
latch.await();
executor.shutdownNow();
long t2 = System.currentTimeMillis();
System.out.println(execution finish time(s): + (t2 - t1) / 1000.0);
}
DemoTask 封装了执行 key 写入的细节,区分不同场景
public void run() { logger.info( Task[{}] start. , id);
try { if (usePipeline) { runWithPipeline();
} else { runWithNonPipeline();
}
} finally { latch.countDown();
}
logger.info(Task[{}] end. , id);
}
不使用 Pipeline 的场景比较简单,循环执行 set 操作
for (int i = 0; i cmdCount; i++) { Jedis jedis = get();
try { jedis.set(key(i), UUID.randomUUID().toString());
} finally { if (jedis != null) { jedisPool.returnResource(jedis);
}
}
if (i % batchSize == 0) { logger.info( Task[{}] process -- {} , id, i);
}
}
使用 Pipeline,需要处理分段,如 10 个作为一批命令执行
for (int i = 0; i cmdCount;) { Jedis jedis = get();
try { Pipeline pipeline = jedis.pipelined();
int j;
for (j = 0; j batchSize; j++) { if (i + j cmdCount) { pipeline.set(key(i + j), UUID.randomUUID().toString());
} else {
break;
}
}
pipeline.sync();
logger.info(Task[{}] pipeline -- {} , id, i + j);
i += j;
} finally { if (jedis != null) { jedisPool.returnResource(jedis);
}
}
}
运行结果
不使用 Pipeline,整体执行 26s;而使用 Pipeline 优化后的代码,执行时间仅需要 3s!
NoPipeline-stat
Pipeline-stat
注意事项
pipeline 机制可以优化吞吐量,但无法提供原子性 / 事务保障,而这个可以通过 Redis-Multi 等命令实现。
感谢各位的阅读!关于“redis 如何通过 pipeline 提升吞吐量”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!