1 前言
数据写入链路是业务数据(binlog)经过处理转换成固定格式的MQ消息,Flink订阅不同Topic来接收不同生产系统的表数据,进行关联、计算、过滤、补充基础数据等加工关联汇总成宽表,最后将加工后的DataStream数据流双写入ES和ClickHouse。查询服务通过JSF和物流网关对外暴露提供给外部进行展示,由于ClickHouse将所有计算能力都用在一次查询上,所以不擅长高并发查询。我们通过对部分实时聚合指标接口增加缓存,或者定时任务查询ClickHosue计算指标存储到ES,部分指标不再实时查ClickHouse而是查ES中计算好的指标来抗住并发,并且这种方式能够极大提高开发效率,易维护,能够统一指标口径。
2 遇到的问题
前文说到遇到了很多困难,下面这些遇到的问题是本文讲述的重点内容。
- 我们该使用什么表引擎
- Flink如何写入到ClickHouse
- 查询ClickHouse为什么要比查询ES慢1~2分钟
- 写入分布式表还是本地表
- 为什么只有某个分片CPU使用率高
- 如何定位是哪些SQL在消耗CPU,这么多慢SQL,我怎么知道是哪个SQL引起的
- 找到了慢SQL,如何进行优化
- 如何抗住高并发、保证ClickHouse可用性
3 表引擎选择与查询方案
ClickHouse有很多表引擎,表引擎决定了数据以什么方式存储,以什么方式加载,以及数据表拥有什么样的特性。目前ClickHouse表引擎一共分为四个系列,分别是Log、MergeTree、Integration、Special。
- Log系列:适用于少量数据(小于一百万行)的场景,不支持索引,所以对于范围查询效率不高。
- Integration系列:主要用于导入外部数据到ClickHouse,或者在ClickHouse中直接操作外部数据,支持Kafka、HDFS、JDBC、Mysql等。
- Special系列:比如Memory将数据存储在内存,重启后会丢失数据,查询性能极好,File直接将本地文件作为数据存储等大多是为了特定场景而定制的。
- MergeTree系列:MergeTree家族自身拥有多种引擎的变种,其中MergeTree作为家族中最基础的引擎提供主键索引、数据分区、数据副本和数据采样等能力并且支持极大量的数据写入,家族中其他引擎在MergeTree引擎的基础上各有所长。
MergeTree系列的表引擎是为插入大量数据而生,数据是以数据片段的形式一个接一个的快速写入,ClickHouse为了避免数据片段过多会在后台按照一定的规则进行合并形成新的段,相比在插入时不断的修改已经存储在磁盘的数据,这种插入后合并再合并的策略效率要高很多。这种数据片段反复合并的特点,也正是MergeTree系列(合并树家族)名称的由来。为了避免形成过多的数据片段,需要进行批量写入。MergeTree系列包含MergeTree、ReplacingMergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree、SummingMergeTree、AggregatingMergeTree引擎,下面就介绍下这几种引擎。
3.1 MergeTree:合并树
下面建表语句中,定义了订单号,商品数量,创建时间,更新时间。按照创建时间进行数据分区,orderNo作为主键(primary key),orderNo也作为排序键(order by),默认情况下主键和排序键相同,大部分情况不需要再专门指定primary key,这个例子中指定只是为了说明下主键和排序键的关系。当然排序键可以与的主键字段不同,但是主键必须为排序键的子集,例如主键(a,b), 排序键必须为(a,b, , ),并且组成主键的字段必须在排序键字段中的最左侧。
CREATE TABLE test_MergeTree ( orderNo String, number Int16, createTime DateTime, updateTime DateTime) ENGINE = MergeTree()PARTITION BY createTimeORDER BY (orderNo)PRIMARY KEY (orderNo);insert into test_MergeTree values('1', '20', '2021-01-01 00:00:00', '2021-01-01 00:00:00');insert into test_MergeTree values('1', '30', '2021-01-01 00:00:00', '2021-01-01 01:00:00');
注意这里写入的两条数据主键orderNo都是1的两条数据,这个场景是我们先创建订单,再更新了订单的商品数量为30和更新时间,此时业务实际订单量为1,商品件量是30。
3.2 ReplacingMergeTree:替换合并树
MergeTree虽然有主键,但是不能对相同主键的数据进行去重,我们的业务场景不能有重复数据。ClickHouse提供了ReplacingMergeTree引擎用来去重,能够在合并分区时删除重复的数据。我理解的去重分两个方面,一个是物理去重,就是重复的数据直接被删除掉,另一个是查询去重,不处理物理数据,但是查询结果是已经将重复数据过滤掉的。
,Date或者DateTime,但是我试验Int类型也是可以支持的(ClickHouse 20.8.11)。ReplacingMergeTree在数据合并时物理数据去重,去重策略如下。
- 如果ver版本列未指定,相同主键行中保留最后插入的一行。
- 如果ver版本列已经指定,下面实例就指定了version列为版本列,去重是将会保留version值最大的一行,与数据插入顺序无关。
CREATE TABLE test_ReplacingMergeTree ( orderNo String, version Int16, number Int16, createTime DateTime, updateTime DateTime) ENGINE = ReplacingMergeTree(version)PARTITION BY createTimeORDER BY (orderNo)PRIMARY KEY (orderNo);1) insert into test_ReplacingMergeTree values('1', 1, '20', '2021-01-01 00:00:00', '2021-01-01 00:00:00');2) insert into test_ReplacingMergeTree values('1', 2, '30', '2021-01-01 00:00:00', '2021-01-01 01:00:00');3) insert into test_ReplacingMergeTree values('1', 3, '30', '2021-01-02 00:00:00', '2021-01-01 01:00:00');-- final方式去重select * from test_ReplacingMergeTree final;-- argMax方式去重select argMax(orderNo,version) as orderNo, argMax(number,version) as number,argMax(createTime,version),argMax(updateTime,version) from test_ReplacingMergeTree;
下图是在执行完前两条insert语句后进行三次查询的结果,三种方式查询均未对物理存储的数据产生影响,final、argMax方式只是查询结果是去重的。
- 普通查询:查询结果未去重,物理数据未去重(未合并分区文件)
- final去重查询:查询结果已去重,物理数据未去重(未合并分区文件)
- argMax去重查询:查询结果已去重,物理数据未去重(未合并分区文件)
由于后台的合并是在不确定时间执行的,执行合并命令,然后再使用普通查询,发现结果已经是去重后的数据,version=2,number=30 是我们想保留的数据。
ReplacingMergeTree具有如下特点
- 使用主键作为判断重复数据的唯一键,支持插入相同主键数据。
- 在合并分区的时候会触发删除重复数据的逻辑。但是合并的时机不确定,所以在查询的时候可能会有重复数据,但是最终会去重。可以手动调用optimize,但是会引发对数据大量的读写,不建议生产使用。
- 以数据分区为单位删除重复数据,当分区合并时,同一分区内的重复数据会被删除,不同分区的重复数据不会被删除。
- 可以通过final,argMax方式做查询去重,这种方式无论有没有做过数据合并,都可以得到正确的查询结果。
ReplacingMergeTree最佳使用方案
- 普通select查询:对时效不高的离线查询可以采用ClickHouse自动合并配合,但是需要保证同一业务单据落在同一个数据分区,分布式表也需要保证在同一个分片(Shard),这是一种最高效,最节省计算资源的查询方式。
- final方式查询:对于实时查询可以使用final,final是本地去重,需要保证同一主键数据落在同一个分片(Shard),但是不需要落在同一个数据分区,这种方式效率次之,但是与普通select相比会消耗一些性能,如果where条件对主键索引,二级索引,分区字段命中的比较好的话效率也可以完全可以使用。
- argMax方式查询:对于实时查询可以使用argMax,argMax的使用要求最低,咋查都能去重,但是由于它的实现方式,效率会低很多,也很消耗性能,不建议使用。后面9.4.3会配合压测数据与final进行对比。
3.3 CollapsingMergeTree/VersionedCollapsingMergeTree:折叠合并树
折叠合并树不再通过示例来进行说明。可参考官网示例。
- 如果sign=1比sign=-1的数据多至少一行,则保留最后一行sign=1的数据。
- 如果sign=-1比sign=1多至少一行,则保留第一行sign=-1的行。
- 如果sign=1与sign=-1的行数一样多,最后一行是sign=1,则保留第一行sign=-1和最后一行sign=1的数据。
- 如果sign=1与sign=-1的行数一样多,最后一行是sign=-1,则什么都不保留。
- 其他情况ClickHouse不会报错但会打印告警日志,这种情况下,查询的结果是不确定不可预知的。
在使用CollapsingMergeTree时候需要注意
- 使用optimize强制合并,同样也不建议在生产环境中使用效率极低并且消耗资源的强制合并。
- 改写查询方式,通过group by 配合有符号的sign列来完成。这种方式增加了使用的编码成本
CollapsingMergeTree还有一个弊端,对写入的顺序有严格的要求,如果按照正常顺序写入,先写入sign=1的行再写入sign=-1的行,能够正常合并,如果顺序反过来则不能正常合并。ClickHouse提供了VersionedCollapsingMergeTree,通过增加版本号来解决顺序问题。但是其他的特性与CollapsingMergeTree完全一致,也不能满足我们的需求
3.4 表引擎总结
6分片 2副本的集群CPU使用率最高在60%,系统整体稳定。下文的所有实践优化也都是基于ReplacingMergeTree引擎。
4 Flink如何写入ClickHouse
4.1 Flink版本问题
- 1.11版本之前包名为flink-jdbc
- 1.11版本(包含)之后包名为flink-connector-jdbc
两者对Flink中以不同方式写入ClickHouse Sink的支持情况如下:
4.2 构造ClickHouse Sink
/** * 构造Sink * @param clusterPrefix clickhouse 数据库名称 * @param sql insert 占位符 eq:insert into demo (id, name) values (?, ?) */public static SinkFunction getSink(String clusterPrefix, String sql) { String clusterUrl = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_URL); String clusterUsername = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_USER_NAME); String clusterPassword = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_PASSWORD); return JdbcSink.sink(sql, new CkSinkBuilder<>(), new JdbcExecutionOptions.Builder().withBatchSize(200000).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .withUrl(clusterUrl) .withUsername(clusterUsername) .withPassword(clusterPassword) .build());}
使用flink-connector-jdbc的JdbcSink.sink() api来构造Flink sink。JdbcSink.sink()入参含义如下
- sql:占位符形式的sql语句,例如:insert into demo (id, name) values (?, ?)
- new CkSinkBuilder<>():org.apache.flink.connector.jdbc.JdbcStatementBuilder接口的实现类,主要是将流中数据映射到java.sql.PreparedStatement 来构造PreparedStatement,具体不再赘述。
- 第三个入参:flink sink的执行策略。
- 第四个入参:jdbc的驱动,连接,账号与密码。
- 使用时直接在DataStream流中addSink即可。
5 Flink写入ClickHouse策略
上段(4.2)代码中new JdbcExecutionOptions.Builder().withBatchSize(200000).build()为写入策略,ClickHouse为了提高写入性能建议进行不少于1000行的批量写入,或每秒不超过一个写入请求。策略是20W行记录进行写入一次,Flink进行Checkpoint的时候也会进行写入提交。所以当数据量积攒到20W或者Flink记性Checkpoint的时候ClickHouse里面才会有数据。我们的ES sink策略是1000行或5s进行写入提交,所以出现了写入ClickHouse要比写入ES慢的现象。
Flink中的org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat#open处理逻辑如下图。
6 写入分布式表还是本地表
网上的资料和ClickHouse云服务的同事都建议写入本地表。分布式表实际上是一张逻辑表并不存储真实的物理数据。如查询分布式表,分布式表会把查询请求发到每一个分片的本地表上进行查询,然后再集合每个分片本地表的结果,汇总之后再返回。写入分布式表,分布式表会根据一定规则,将写入的数据按照规则存储到不同的分片上。如果写入分布式表也只是单纯的网络转发,影响也不大,但是写入分布式表并非单纯的转发,实际情况见下图。
- 第一步:写入分布式表1000条数据,分布式表会根据路由规则,假设按照规则300条分配到S1,200条到S2,500条到S3
- 第二步:client给过来1000条数据,属于S1的300条数据直接写入磁盘,数据S2,S3的数据也会写入到S1的临时目录
- 第三步:S2,S3接收到zk的变更通知,生成拉取S1中当前分片对应的临时目录数据的任务,并且将任务放到一个队列,等到某个时机会将数据拉到自身节点。
从分布式表的写入方式可以看到,会将所有数据落到client连接分片的磁盘上。如果数据量大,磁盘的IO会造成瓶颈。并且MergeTree系列引擎存在合并行为,本身就有写放大(一条数据合并多次),占用一定磁盘性能。在网上看到写入本地表的案例都是日增量百亿,千亿。我们选择写入分布式表主要有两点,一是简单,因为写入本地表需要改造代码,自己指定写入哪个节点,另一个是开发过程中写入本地表并未出现什么严重的性能瓶颈。双十一期间数据日增3000W(合并后)行并未造成写入压力。如果后续产生瓶颈,可能会放弃写入分布式表。
7 为什么只有某个分片CPU使用率高
7.1 数据分布不均匀,导致部分节点CPU高
7.2 某节点触发合并,导致该节点CPU高
SELECT ifNull(sum(t1.unTrackQty), 0) AS unTrackQtyFROM wms.wms_order_sku_local AS t1 FINAL PREWHERE t1.shipmentOrderCreateTime > '2021-11-17 11:00:00' AND t1.shipmentOrderCreateTime <= '2021-11-18 11:00:00' AND t1.gridStationNo = 'WG0000514' AND t1.warehouseNo NOT IN ('wms-6-979', 'wms-6-978', '6_979', '6_978') AND t1.orderType = '10'WHERE t1.ckDeliveryTaskStatus = '3'
但是我们有个疑惑,同样的语句,同样的执行次数,而且两个节点的数据量,part数量都没有差异,为什么7-4节点扫描的行数是7-0上的5倍,把这个原因找到,应该就能定位到问题的根本原因了。
接下来我们使用clickhouse-client进行SQL查询,开启trace级别日志,查看SQL的执行过程。具体执行方式以及查询日志分析参考下文9.1小节,这里我们直接分析结果。
- 7-0节点:扫描了4个part分区文件,共计94W行,耗时0.089s
- 7-4节点:扫描了2个part分区文件,其中有一个part491W行,共计502W行,耗时0.439s
很明显7-4节点的202111_0_408188_322这个分区比较异常,因为我们是按月分区的,7-4节点不知道什么原因发生了分区合并,导致我们检索的11月17号的数据落到了这个大分区上,所以但是查询会过滤11月初到18号的所有数据,和7-0节点产生了差异。上述的SQL通过 gridStationNo = ‘WG0000514’ 条件进行查询,所以在对gridStationNo 字段进行创建二级索引后解决了这个问题。
7.3 物理机故障
这种情况少见,但是也遇到过一次
8 如何定位是哪些SQL在消耗CPU
8.1 grafana定位高频执行SQL
在12月份上线了一些需求,最近发现CPU使用率对比来看使用率偏高,需要排查具体是哪些SQL导致的。
8.2 扫描行数高/使用内存高:query_log_all分析
ClickHouse自身有system.query_log表,用于记录所有的语句的执行日志,下图是该表的一些关键字段信息
-- 创建query_log分布式表CREATE TABLE IF NOT EXISTS system.query_log_allON CLUSTER defaultAS system.query_logENGINE = Distributed(sht_ck_cluster_pro,system,query_log,rand());-- 查询语句select -- 执行次数 count(), -- 平均查询时间 avg(query_duration_ms) avgTime, -- 平均每次读取数据行数 floor(avg(read_rows)) avgRow, -- 平均每次读取数据大小 floor(avg(read_rows) / 10000000) avgMB, -- 具体查询语句 any(query), -- 去除掉where条件,用户group by归类 substring(query, positionCaseInsensitive(query, 'select'), positionCaseInsensitive(query, 'from')) as queryLimitfrom system.query_log_all/system.query_logwhere event_date = '2022-01-21' and type = 2group by queryLimitorder by avgRow desc;
query_log是本地表,需要创建分布式表,查询所有节点的查询日志,然后再执行查询分析语句,执行效果见下图,图中可以看出有几个语句平均扫秒行数已经到了亿级别,这种语句可能就存在问题。通过扫描行数可以分析出索引,查询条件等不合理的语句。7.2中的某个节点CPU偏高就是通过这种方式定位到有问题的SQL语句,然后进一步排查从而解决的。
9 如何优化慢查询
9.1 使用服务日志进行慢查询分析
虽然ClickHouse在20.6版本之后已经提供查看查询计划的原生EXPLAIN,但是提供的信息对我们进行慢SQL优化提供的帮助不是很大,在20.6版本前借助后台的服务日志,可以拿到更多的信息供我们分析。与EXPLAIN相比我更倾向于使用查看服务日志这种方式进行分析,这种方式需要使用clickhouse-client进行执行SQL语句,文末有通过docker搭建CK环境文档。高版本的EXPLAIN提供了ESTIMATE可以查询到SQL语句扫描的part数量、数据行数等细粒度信息,EXPLAIN使用方式可以参考官方文档说明。
用一个慢查询来进行分析,通过8.2中的query_log_all定位到下列慢SQL。
select ifNull(sum(interceptLackQty), 0) as interceptLackQtyfrom wms.wms_order_sku_local final prewhere productionEndTime = '2022-02-17 08:00:00' and orderType = '10'where shipmentOrderDetailDeleted = '0' and ckContainerDetailDeleted = '0'
使用clickhouse-client,send_logs_level参数指定日志级别为trace。
clickhouse-client -h 地址 --port 端口 --user 用户名 --password 密码 --send_logs_level=trace
在client中执行上述慢SQL,服务端打印日志如下,日志量较大,省去部分部分行,不影响整体日志的完整性。
[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.036317 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> executeQuery: (from 11.77.96.163:35988, user: bjwangjiangbo) select ifNull(sum(interceptLackQty), 0) as interceptLackQty from wms.wms_order_sku_local final prewhere productionEndTime = '2022-02-17 08:00:00' and orderType = '10' where shipmentOrderDetailDeleted = '0' and ckContainerDetailDeleted = '0'[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.037876 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> ContextAccess (bjwangjiangbo): Access granted: SELECT(orderType, interceptLackQty, productionEndTime, shipmentOrderDetailDeleted, ckContainerDetailDeleted) ON wms.wms_order_sku_local[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038239 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> wms.wms_order_sku_local (SelectExecutor): Key condition: unknown, unknown, and, unknown, unknown, and, and, unknown, unknown, and, and[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038271 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> wms.wms_order_sku_local (SelectExecutor): MinMax index condition: unknown, unknown, and, unknown, unknown, and, and, unknown, unknown, and, and[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038399 [ 1340 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202101_0_0_0_3[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038475 [ 1407 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202103_0_17_2_22[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038491 [ 111 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202103_18_20_1_22..................................省去若干行(此块含义为:在分区内检索有没有使用索引).................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039041 [ 1205 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202202_1723330_1723365_7[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039054 [ 159 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202202_1723367_1723367_0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038928 [ 248 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202201_3675258_3700711_1054[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039355 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> wms.wms_order_sku_local (SelectExecutor): Selected 47 parts by date, 47 parts by key, 9471 marks by primary key, 9471 marks to read from 47 ranges[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039495 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202101_0_0_0_3, approx. 65536 rows starting from 0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039583 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202101_1_1_0_3, approx. 16384 rows starting from 0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.040291 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202102_0_2_1_4, approx. 146850 rows starting from 0..................................省去若干行(每个分区读取的数据行数信息).................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.043538 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723330_1723365_7, approx. 24576 rows starting from 0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.043604 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723366_1723366_0, approx. 8192 rows starting from 0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.043677 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723367_1723367_0, approx. 8192 rows starting from 0..................................完成数据读取,开始进行聚合计算.................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.047880 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> InterpreterSelectQuery: FetchColumns -> Complete[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.263500 [ 1377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> AggregatingTransform: Aggregating[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.263680 [ 1439 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> Aggregator: Aggregation method: without_key..................................省去若干行(数据读取完成后做聚合操作).................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.263840 [ 156 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> AggregatingTransform: Aggregated. 12298 to 1 rows (from 36.03 KiB) in 0.215046273 sec. (57187.69187876137 rows/sec., 167.54 KiB/sec.)[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.264283 [ 377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> AggregatingTransform: Aggregated. 12176 to 1 rows (from 35.67 KiB) in 0.215476999 sec. (56507.191284950095 rows/sec., 165.55 KiB/sec.)[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.264307 [ 377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Trace> Aggregator: Merging aggregated data..................................完成聚合计算,返回最终结果.................................................┌─interceptLackQty─┐│ 563 │└──────────────────┘...................................数据处理耗时,速度,信息展示................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.265490 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Information> executeQuery: Read 73645604 rows, 1.20 GiB in 0.229100749 sec., 321455099 rows/sec., 5.22 GiB/sec.[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.265551 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} <Debug> MemoryTracker: Peak memory usage (for query): 60.37 MiB.1 rows in set. Elapsed: 0.267 sec. Processed 73.65 million rows, 1.28 GB (276.03 million rows/s., 4.81 GB/s.)
现在分析下,从上述日志中能够拿到什么信息,首先该查询语句没有使用主键索引,具体信息如下
同样也没有使用分区索引,具体信息如下
此次查询一共扫描36个parts,9390个MarkRange,通过查询system.parts系统分区信息表发现当前表一共拥有36个活跃的分区,相当于全表扫描。
2022.02.17 21:21:54.265490 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} executeQuery: Read 73645604 rows, 1.20 GiB in 0.229100749 sec., 321455099 rows/sec., 5.22 GiB/sec。
2022.02.17 21:21:54.265551 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MemoryTracker: Peak memory usage (for query): 60.37 MiB。
1 rows in set. Elapsed: 0.267 sec. Processed 73.65 million rows, 1.28 GB (276.03 million rows/s., 4.81 GB/s.)
- 没有使用主键索引:导致全表扫描
- 没有使用分区索引:导致全表扫描
所以需要再查询条件上添加主键字段或者分区索引来进行优化。
9.2 建表优化
9.2.1 尽量不使用Nullable类型
从实践上看,设置成Nullable对性能影响也没有多大,可能是因为我们数据量比较小。不过官方已经明确指出尽量不要使用Nullable类型,因为Nullable字段不能被索引,而且Nullable列除了有一个存储正常值的文件,还会有一个额外的文件来存储Null标记。
CREATE TABLE test_Nullable( orderNo String, number Nullable(Int16), createTime DateTime) ENGINE = MergeTree()PARTITION BY createTimeORDER BY (orderNo)PRIMARY KEY (orderNo);
上述建表语句为例,number 列会生成number.null.*两个额外文件,占用额外存储空间,而orderNo列则没有额外的null标识的存储文件。
9.2.2 分区粒度
分区粒度根据业务场景特性来设置,不宜过粗也不宜过细。我们的数据一般都是按照时间来严格划分,所以都是按天、按月来划分分区。如果索引粒度过细按分钟、按小时等划分会产生大量的分区目录,更不能直接PARTITION BY create_time,会导致分区数量惊人的多,几乎每条数据都有一个分区会严重的影响性能。如果索引粒度过粗,会导致单个分区的数据量级比较大,上面7.2节的问题和索引粒度也有关系,按月分区,单个分区数据量到达500W级,数据范围1号到18号,只查询17号,18号两天的数据量,但是优化按月分区,分区合并之后不得不处理不相关的1号到16号的额外数据,如果按天分区就不会产生CPU飙升的现象。所以要根据自己业务特性来创建,保持一个原则就是查询只处理本次查询条件范围内的数据,不额外处理不相关的数据。
9.2.3 分布式表选择合适的分片规则
9.3 性能测试,对比优化效果
在聊查询优化之前先说一个小工具,clickhouse提供的一个clickhouse-benchmark性能测试工具,环境和前文提到的一样通过docker搭建CK环境,压测参数可参考官方文档,这里我举一个简单的单并发测试示例。
clickhouse-benchmark -c 1 -h 链接地址 --port 端口号 --user 账号 --password 密码 <<< "具体SQL语句"
9.4 查询优化
9.4.1 条件聚合函数降低扫描数据行数
假设一个接口要统计某天的”入库件量”,”有效出库单量”,”复核件量”。
-- 入库件量select sum(qty) from table_1 final prewhere type = 'inbound' and dt = '2021-01-01';-- 有效出库单量select count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' where and status = '1' ;-- 复核件量select sum(qty) from table_1 final prewhere type = 'check' and dt = '2021-01-01';
一个接口出三个指标需要上述三个SQL语句查询table_1 来完成,但是我们不难发现dt是一致的,区别在于type和status两个条件。假设dt = ‘2021-01-1’ 每次查询需要扫描100W行数据,那么一次接口请求将会扫描300W行数据。通过条件聚合函数优化后将三次查询改成一次,那么扫描行数将降低为100W行,所以能极大的节省集群的计算资源。
select sumIf(qty, type = 'inbound'), -- 入库件量countIf(distinct orderNo, type = 'outbound' and status = '1'), -- 有效出库单量sumIf(qty, type = 'check') -- 复核件量prewhere dt = '2021-01-01';
条件聚合函数是比较灵活的,可根据自己业务情况自由发挥,记住一个宗旨就是减少整体的扫描量,就能到达提升查询性能的目的。
9.4.2 二级索引
跳数索引是指数据片段按照粒度(建表时指定的index_granularity)分割成小块后,将granularity_value数量的小块组合成一个大的块,对这些大块写入索引信息,这样有助于使用where筛选时跳过大量不必要的数据,减少SELECT需要读取的数据量。
CREATE TABLE table_name( u64 UInt64, i32 Int32, s String, ... INDEX a (u64 * i32, s) TYPE minmax GRANULARITY 3, INDEX b (u64 * length(s)) TYPE set(1000) GRANULARITY 4) ENGINE = MergeTree()...
上例中的索引能让 ClickHouse 执行下面这些查询时减少读取数据量。
SELECT count() FROM table WHERE s < 'z'SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
支持的索引类型
- minmax:以index granularity为单位,存储指定表达式计算后的min、max值;在等值和范围查询中能够帮助快速跳过不满足要求的块,减少IO。
- set(max_rows):以index granularity为单位,存储指定表达式的distinct value集合,用于快速判断等值查询是否命中该块,减少IO。
- ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed):将string进行ngram分词后,构建bloom filter,能够优化等值、like、in等查询条件。
- tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed): 与ngrambf_v1类似,区别是不使用ngram进行分词,而是通过标点符号进行词语分割。
- bloom_filter([false_positive]):对指定列构建bloom filter,用于加速等值、like、in等查询条件的执行。
Alter table wms.wms_order_sku_local ON cluster default ADD INDEX belongProvinceCode_idx belongProvinceCode TYPE set(0) GRANULARITY 5;Alter table wms.wms_order_sku_local ON cluster default ADD INDEX productionEndTime_idx productionEndTime TYPE minmax GRANULARITY 5;
重建分区索引数据:在创建二级索引前插入的数据,不能走二级索引,需要重建每个分区的索引数据后才能生效
-- 拼接出所有数据分区的MATERIALIZE语句select concat('alter table wms.wms_order_sku_local on cluster default ', 'MATERIALIZE INDEX productionEndTime_idx in PARTITION '||partition_id||',')from system.partswhere database = 'wms' and table = 'wms_order_sku_local'group by partition_id-- 执行上述SQL查询出的所有MATERIALIZE语句进行重建分区索引数据
9.4.3 final替换argMax进行去重
对比下final和argMax两种方式的性能差距,如下SQL
-- final方式select count(distinct groupOrderCode), sum(arriveNum), count(distinct sku) from tms.group_order final prewhere siteCode = 'WG0001544' and createTime >= '2022-03-14 22:00:00' and createTime <= '2022-03-15 22:00:00' where arriveNum > 0 and test <> '1'-- argMax方式select count(distinct groupOrderCode), sum(arriveNumTemp), count(distinct sku) from (select argMax(groupOrderCode,version) as groupOrderCode, argMax(arriveNum,version) as arriveNumTemp, argMax(sku,version) as sku from tms.group_order prewhere siteCode = 'WG0001544' and createTime >= '2022-03-14 22:00:00' and createTime <= '2022-03-15 22:00:00' where arriveNum > 0 and test <> '1' group by docId)
9.4.4 prewhere替代where
ClickHouse的语法支持了额外的prewhere过滤条件,它会先于where条件进行判断,可以看做是更高效率的where,作用都是过滤数据。当在sql的filter条件中加上prewhere过滤条件时,存储扫描会分两阶段进行,先读取prewhere表达式中依赖的列值存储块,检查是否有记录满足条件,在把满足条件的其他列读出来,以下述的SQL为例,其中prewhere方式会优先扫描type,dt字段,将符合条件的列取出来,当没有任何记录满足条件时,其他列的数据就可以跳过不读了。相当于在Mark Range的基础上进一步缩小扫描范围。prewhere相比where而言,处理的数据量会更少,性能会更高。看这段话可能不太容易理解,
-- 常规方式select count(distinct orderNo) final from table_1 where type = 'outbound' and status = '1' and dt = '2021-01-01';-- prewhere方式select count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' where and status = '1' ;
上节我们说了使用final进行去重优化。通过final去重,并且使用prewhere进行查询条件优化时有个坑需要注意,prewhere会优先于final进行执行,所以对于status这种值可变的字段处理过程中,能够查询到中间状态的数据行,导致最终数据不一致。
--语句1:使用where + status=1 查询,无法命中docId:123_1这行数据select count(distinct orderNo) final from table_1 where type = 'outbound' and dt = '2021-01-01' and status = '1';--语句2:使用where + status=2 查询,可以查询到docId:123_1这行数据select count(distinct orderNo) final from table_1 where type = 'outbound' and dt = '2021-01-01' and status = '2';
当我们引入prewhere后,语句3写法:prewhere过滤status字段时将status=1,version=102的数据会过滤出来,导致我们查询结果不正确。正确的写法是语句2,将不可变字段使用prewhere进行优化。
-- 语句3:错误方式,将status放到prewhereselect count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' and status = '1';-- 语句4:正确prewhere方式,status可变字段放到where上select count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' where and status = '1' ;
其他限制:prewhere目前只能用于MergeTree系列的表引擎
9.4.5 列裁剪,分区裁剪
而分区裁剪就是只读取需要分区,控制好分区字段查询范围。
9.4.6 where、group by 顺序
-- 建表语句create table group_order_local( docId String, version UInt64, siteCode String, groupOrderCode String, sku String, ... 省略非关键字段 ... createTime DateTime) engine = ReplicatedReplacingMergeTree('/clickhouse/tms/group_order/{shard}', '{replica}', version)PARTITION BY toYYYYMM(createTime)ORDER BY (siteCode, groupOrderCode, sku);--查询语句1select count(distinct groupOrderCode) groupOrderQty, ifNull(sum(arriveNum),0) arriveNumSum,count(distinct sku) skuQtyfrom tms.group_order finalprewhere createTime >= '2021-09-14 22:00:00' and createTime <= '2021-09-15 22:00:00'and siteCode = 'WG0000709'where arriveNum > 0 and test <> '1'--查询语句2 (where/prewhere中字段)select count(distinct groupOrderCode) groupOrderQty, ifNull(sum(arriveNum),0) arriveNumSum,count(distinct sku) skuQtyfrom tms.group_order finalprewhere siteCode = 'WG0000709' and createTime >= '2021-09-14 22:00:00' and createTime <= '2021-09-15 22:00:00'where arriveNum > 0 and test <> '1'
建表语句 ORDER BY (siteCode, groupOrderCode, sku),语句1没有符合要求经过压测QPS6.4,TP99 0.56s,语句2符合要求经过压测QPS 14.9,TP99 0.12s
10 如何抗住高并发、保证ClickHouse可用性
max_threads:位于 users.xml 中,表示单个查询所能使用的最大 CPU 个数,默认是 CPU 核数,假如机器是32C,则会起32个线程来处理当前请求。可以把max_threads调低,牺牲单次查询速度来保证ClickHouse的可用性,提升并发能力。可通过jdbc的url来配置
3)异步任务执行查询语句,将聚合指标结果落到ES中,应用查询ES中的聚合结果
4)物化视图,通过预聚合方式解决这种问题,但是我们这种业务场景不适用
11 资料集合
•更改ORDER BY字段,PARTITION BY,备份数据,单表迁移数据等操作
•基于docker搭建grafana监控SQL执行情况
作者:京东物流 马红岩
内容来源:京东云开发者社区