共计 2333 个字符,预计需要花费 6 分钟才能阅读完成。
这篇文章将为大家详细讲解有关 ClickHouse 是如何批量写入的,文章内容质量较高,因此丸趣 TV 小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
简介
批量写入又称为 bulk write,对于单表插入多条数据的场景,可以减少插入请求数量,提高吞吐量和效率。clickhouse 官方 Golang 驱动 clickhouse-go[1]支持该关键特性,但是文档的介绍不是很详细,只有一句:
Bulk write support : begin- prepare- (in loop exec)- commit
并没有详细介绍用法和原理,笔者在开发业务时使用的库是 sqlx[2],sql 也支持 clickhouse-go 驱动。参考了官方样例代码[3]:
...
tx, err := connect.Begin()
checkErr(err)
stmt, err := tx.Prepare(INSERT INTO example (country_code, os_id, browser_id, categories, action_day, action_time) VALUES (?, ?, ?, ?, ?, ?) )
checkErr(err)
for i := 0; i 100; i++ {
if _, err := stmt.Exec(
RU ,
10+i,
100+i,
[]int16{1, 2, 3},
time.Now(),
time.Now(),
); err != nil {
log.Fatal(err)
}
}
...
我写的 bulk write 类似上面的代码,但是提交给同事 review 时,他提出了疑问:stmt.Exec 是每次执行都发送写请求到数据库吗?这个问题其实我不敢肯定,官方文档也说得不明确。考虑到严谨性,让自己的 PR 更有说服力,自己去翻看了相关源代码。
这里需要指出,如果利用编辑器里的代码跳转功能会跳到 database/sql 库中的 Exec 函数实现,实际上我们要看的代码是 clickhouse-go 中的实现,至于编辑器跳转到 database/sql 中的原因,书写此文时笔者也没弄清楚,先挖个坑吧。
核心实现
stmt.Exec 的核心代码如下[4]:
func (stmt *stmt) execContext(ctx context.Context, args []driver.Value) (driver.Result, error) {
if stmt.isInsert {
stmt.counter++
if err := stmt.ch.block.AppendRow(args); err != nil {
return nil, err
}
if (stmt.counter % stmt.ch.blockSize) == 0 {
stmt.ch.logf([exec] flush block )
if err := stmt.ch.writeBlock(stmt.ch.block); err != nil {
return nil, err
}
if err := stmt.ch.encoder.Flush(); err != nil {
return nil, err
}
}
return emptyResult, nil
}
if err := stmt.ch.sendQuery(stmt.bind(convertOldArgs(args))); err != nil {
return nil, err
}
if err := stmt.ch.process(); err != nil {
return nil, err
}
return emptyResult, nil
}
上面的代码不多,非常清晰,当执行 Exec 时,stmt.ch.block.AppendRow(args)会先把 sql 参数附加到本地缓存 block 中,然后 (stmt.counter % stmt.ch.blockSize) 判断本地缓存大小是否到达阈值,到达则执行 Flush(),将数据写入远端。综上,clickhouse-go 中的核心实现逻辑是:
底层维护一个缓存 block,同时设置 block_size 控制缓存大小执行 stmt.Exec 时,不会直接写入远程 ClickHouse 中,而是将插入参数 Append 到 block 中每次 Append 后,判断 block 的 size 和 block_size 的关系,如果正好整除,则刷新 block(即写入 clickhouse)
因此 block_size 这个参数很重要,它表示本地缓存的上限,如果很大的话,程序会占用一些内存。笔者起初设置为 100000,在调试日志中看不到 stmt.ch.logf([exec] flush block )打印的 log,设置小后就看到下面的输出:
...
[clickhouse][connect=1][begin] tx=false, data=false
[clickhouse][connect=1][prepare]
[clickhouse][connect=1][read meta] - data: packet=1, columns=6, rows=0
[clickhouse][connect=1][exec] flush block
[clickhouse][connect=1][exec] flush block
....
关于 ClickHouse 是如何批量写入的就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。