Z-Ordering索引加速大数据查询
2020-01-05 修正:Spark(2.4.3)对Parquet(1.10.1)的min/max过滤是RowGoup级别(min/max meta存储在parquet的footer里),之前说的Page级别是不正确的。 感谢网友恩爸指正。
“快”就是生产力。【让数据处理速度跟上用户的思维】很重要。而索引技术是加速查询非常重要的一个方面。 对于大数据索引的一些概念,不妨参看数据即索引-大数据索引漫谈。 里面其实核心就如下几个观点:
- 大数据索引的核心是做File Skip 而非 Row Skip
- 索引蕴含在数据的分布里,索引和数据通常是一体的
今天再加上两个观点:
- 大数据索引在时延上可以非常灵活。小到实时,大到分钟天
- 索引是可以在原始数据上的,也可以是独立的数据副本
场景和需求
先看看我们遇到的问题,条件过滤是个很重要的查询场景,比如如下语句:
select * from delta_mlsql_job
where status=4 and mlsql_user_id=1
类似 "x<10 and x>1"也是常见场景。 假设我们以Spark作为计算引擎,Parquet作为存储格式 来分析下这个查询是怎么执行的。
在此之前,我们先简单理解下parquet存储格式:
- 一个Parquet文件 = N个RowGroup + 一个Footer组成
- RowGroup = N个 ColumnChunk (列式存,每个列会组成一个Chunk)
- ColumnChunk = N个Page + N个Page Meta
Page 算是Parquet最小的扫描单元了(实际上在spark 2.4.3是会直接读取一个ColumnChunk的所有pages的,而不会在基于page的元数据做过滤) 通常一个表是由多个parquet文件组成。当上面的示例查询过来的时候,spark首先是要加载所有的parquet footer获取RowGroup以及ColumnChunk信息,然后使用用户的filter对RowGroup进行过滤。
一个ColumnChunk在footer里存储有自己的min/max值。假设数据是散乱的,那么这个min/max值基本没有价值,比如极端情况是每个ColumnChunk的min/max值都是一样且是实际全量数据的最大值和最小值,这个时候就完全没有过滤效果了,计算引擎需要对每个ColumnChunk的数据都进行加载和扫描过滤。
所以这个时候聪明的你就想着,那我如果对数据事先排序,就可以让ColumnChunk 的min/max值是连续不重叠的了,这个时候过滤效果就会很明显。比如按status排序,这个时候如果用户的查询包含了status,那么效果就会很好。但是如果用户的查询不包含status,那么效果可能就会很差了。常见情况其实用户都会使用多个字段进行过滤,那有没有办法能解决这个问题? 答案是肯定的,就是今天的主角Z-Ordering.
Z-Ordering的解决方案出奇的简单,把你的表新增一列,然后这列的值是用你可能会用到的过滤的列组装出来的,组装的方式很简单,按某种方式把这些列的值转化为byte数组,然后按bit位interleaving即可,interleaving方式如下:
图中是两个字段的拼接,多个字段也是按相同的方式进行拼接。接着,用这个新列进行排序,然后再写入到存储中即可。这样就算完成了索引的构建了。
查询的时候,要求系统根据用户的查询条件,自动添加这一列进行过滤。
Z-Ordering 本质上是把多维转化为一维,并且还保持可排序性。当然,列越多,最后Z-Ordering的地址空间就越大,过滤效果就越差。
Z-Ordering的基础操作
前面我们说,我们需要按某种方式把这些列的值转化为byte数组,然后按bit位interleaving。interleaving的规则很清楚,但是这里的“按某种方式把这些列的值转化为byte数组”是指的什么呢?能否直接用比如数字或者字符串的二进制表示形式么?答案是不能。这里我们以Java为例,Z-Ordering要求数字和字符串的二进制形式要保留原来的可排序。比如:
-1 的二进制原码是: 10000000 00000000 00000000 00000001 1 的二进制源码是: 00000000 00000000 00000000 00000001
所谓二进制的可排序性是指按从左到右按字面位比较。所以-1的原码二进制是比1的大的。而实际上在java里-1是用补码表示的,是一个字面上更大的值(11111111 11111111 11111111 11111111 )。 所以我们需要对数字的正负性进行处理,保证他的二进制字面比较能和实际的值一致。为了解决这个问题,我们对有符号数需要做三个操作
- 统一转化为原码表示
- 最高位翻转
- 从左边补,补足八个字节
对于utf8编码,只要简单的截断成八个字节即可。
比如int类型的操作实现如下:
public static byte[] toBytes(long val) {
long temp = val;
byte[] b = new byte[8];
for (int i = 7; i > 0; i--) {
b[i] = (byte) temp;
temp >>>= 8;
}
b[0] = (byte) temp;
return b;
}
//考虑负数,如果是负数,还原成原码表示,然后直接将第一位翻转,最后padding 成8字节
// 正数,第一位翻转,然后Padding成8字节
public static byte[] intTo8Byte(int a) {
int temp = a;
if (a < 0) {
temp = (~(a - 1))^ (1 << 31);
}
temp = temp ^ (1 << 31);
return paddingTo8Byte(toBytes(temp));
}
接着,当我们获取到多个8字节标的字段时,就可以对他们进行interleaving了:
//用b 的bpos bit 位,设置a的 apos bit位
public static byte updatePos(byte a, int apos, byte b, int bpos) {
//将bpos以外的都设置为0
byte temp = (byte) (b & (1 << (7 - bpos)));
//把temp bpos位置的值移动到apos
//小于的话,左移
if (apos < bpos) {
temp = (byte) (temp << (bpos - apos));
}
//大于,右边移动
if (apos > bpos) {
temp = (byte) (temp >> (apos - bpos));
}
//把apos以外的都设置为0
byte atemp = (byte) (a & (1 << (7 - apos)));
if ((byte) (atemp ^ temp) == 0) {
return a;
}
return (byte) (a ^ (1 << (7 - apos)));
}
//每个属性用8byte表示。但是属性数目不确定。
public static byte[] interleaveMulti8Byte(byte[][] buffer) {
int attributesNum = buffer.length;
byte[] result = new byte[8 * attributesNum];
//结果的第几个byte的第几个位置
int resBitPos = 0;
//每个属性总的bit数
int totalBits = 64;
//第一层循环移动bit
for (int bitStep = 0; bitStep < totalBits; bitStep++) {
//首先获取当前属性在第几个byte(总共八个)
int tempBytePos = (int) Math.floor(bitStep / 8);
//获取bitStep在对应属性的byte位的第几个位置
int tempBitPos = bitStep % 8;
//获取每个属性的bitStep位置的值
for (int i = 0; i < attributesNum; i++) {
int tempResBytePos = (int) Math.floor(resBitPos / 8);
int tempResBitPos = resBitPos % 8;
result[tempResBytePos] = updatePos(result[tempResBytePos], tempResBitPos, buffer[i][tempBytePos], tempBitPos);
//结果bit要不断累加
resBitPos++;
}
}
return result;
}
更多代码可参考这里ZOrderingBytesUtil
构建索引
现在,利用上面的工具类,我们就可以将他们应用到你的DataFrame里,从而让你的数据包含这个新的列。 值得注意的是,查询的时候用户可能不会写全你索引的字段,但是Z-Ordering编码是要求三个字段都必须在的,这个时候你就可以使用缺失字段的min/max值进行补充。所以我们需要在写入数据的时候,同时将索引字段的min-max值写到schema的metadata里,方便后续改写Query的时候获取到。
具体代码参看 ZOrderingIndexer.scala#L179
现阶段支持 Float/Double/Int/Long/String 五个类型的字段。日期字段后续会加上。
改写查询
用户其实并不知道Z-Ordering字段的存在,所以他还是会正常的写成
select * from delta_mlsql_job
where status=4 and mlsql_user_id=1
我们的目标是将其修改为:
select * from delta_mlsql_job where status=4 and mlsql_user_id=1
and __mlsql_indexer_zordering_8e8e3948573bfcb5c7267884fa389375 <= 0xE00000000000000000000000000000000000000000048342
and 0xE00000000000000000000000000000000000000000048142 <= __mlsql_indexer_zordering_8e8e3948573bfcb5c7267884fa389375
其中 __mlsql_indexer_zordering_8e8e3948573bfcb5c7267884fa389375
是索引字段名称。
值得注意的是,Z-Ordering核心是缩小的你的查询范围,而不是精准定位记录,所以原始的过滤条件还是需要继续保留的,
我们通过新增一个字段,加强过滤能力。
修改的方法也比较简单,根据原始SQL获取 LogicalPlan,然后修改LogicalPlan即可。具体代码可参考:
目前我只实现了Equal表达式,GreatEq/Great和LessEq/Less后续也是按相同的方式来实现。
进阶
其实如果只是做到利用Parquet的ColumnChunk级别的min/max值,效果还是大打折扣的,完全可以做到Parquet File级别的Skip. 因为事先根据Z-Ordering做了排序,所以Parquet File对应的min/max值都不会重叠,这意味着可以直接根据min/max值进行File Skip了。 可能一个几十亿的表,你只要扫描一个文件即可,获得不可思议的性能提升。
那么要做这个过滤,通用的解决方案是在Parquet 目录里维护一个json格式的meta信息,每个Parquet文件对应一个min/max值。如果是delta,可以 直接在delta_log里的json文件中的AddFile对象中添加min/max到stats属性里。
缺点
Z-Ordering的缺点也比较明显,就是一份数据只能有一个Z-Ordering索引(因为数据要求有序), 而如果用户构建该索引的字段过多,则可能会导致过滤效果很差。解决办法是增加数据副本,然后查询时根据需求条件查询不同的数据副本。
Bonus
对于含有日期等分区的数据怎么办?保证每个分区里的数据是是按Z-Ordering索引排序即可。