Z-Ordering索引加速大数据查询

2020-01-05 修正:Spark(2.4.3)对Parquet(1.10.1)的min/max过滤是RowGoup级别(min/max meta存储在parquet的footer里),之前说的Page级别是不正确的。 感谢网友恩爸指正。

“快”就是生产力。【让数据处理速度跟上用户的思维】很重要。而索引技术是加速查询非常重要的一个方面。 对于大数据索引的一些概念,不妨参看数据即索引-大数据索引漫谈。 里面其实核心就如下几个观点:

  1. 大数据索引的核心是做File Skip 而非 Row Skip
  2. 索引蕴含在数据的分布里,索引和数据通常是一体的

今天再加上两个观点:

  1. 大数据索引在时延上可以非常灵活。小到实时,大到分钟天
  2. 索引是可以在原始数据上的,也可以是独立的数据副本

场景和需求

先看看我们遇到的问题,条件过滤是个很重要的查询场景,比如如下语句:

select * from delta_mlsql_job 
where status=4 and mlsql_user_id=1

类似 "x<10 and x>1"也是常见场景。 假设我们以Spark作为计算引擎,Parquet作为存储格式 来分析下这个查询是怎么执行的。

在此之前,我们先简单理解下parquet存储格式:

  1. 一个Parquet文件 = N个RowGroup + 一个Footer组成
  2. RowGroup = N个 ColumnChunk (列式存,每个列会组成一个Chunk)
  3. ColumnChunk = N个Page + N个Page Meta

Page 算是Parquet最小的扫描单元了(实际上在spark 2.4.3是会直接读取一个ColumnChunk的所有pages的,而不会在基于page的元数据做过滤) 通常一个表是由多个parquet文件组成。当上面的示例查询过来的时候,spark首先是要加载所有的parquet footer获取RowGroup以及ColumnChunk信息,然后使用用户的filter对RowGroup进行过滤。

一个ColumnChunk在footer里存储有自己的min/max值。假设数据是散乱的,那么这个min/max值基本没有价值,比如极端情况是每个ColumnChunk的min/max值都是一样且是实际全量数据的最大值和最小值,这个时候就完全没有过滤效果了,计算引擎需要对每个ColumnChunk的数据都进行加载和扫描过滤。

所以这个时候聪明的你就想着,那我如果对数据事先排序,就可以让ColumnChunk 的min/max值是连续不重叠的了,这个时候过滤效果就会很明显。比如按status排序,这个时候如果用户的查询包含了status,那么效果就会很好。但是如果用户的查询不包含status,那么效果可能就会很差了。常见情况其实用户都会使用多个字段进行过滤,那有没有办法能解决这个问题? 答案是肯定的,就是今天的主角Z-Ordering.

Z-Ordering的解决方案出奇的简单,把你的表新增一列,然后这列的值是用你可能会用到的过滤的列组装出来的,组装的方式很简单,按某种方式把这些列的值转化为byte数组,然后按bit位interleaving即可,interleaving方式如下:

图中是两个字段的拼接,多个字段也是按相同的方式进行拼接。接着,用这个新列进行排序,然后再写入到存储中即可。这样就算完成了索引的构建了。

查询的时候,要求系统根据用户的查询条件,自动添加这一列进行过滤。

Z-Ordering 本质上是把多维转化为一维,并且还保持可排序性。当然,列越多,最后Z-Ordering的地址空间就越大,过滤效果就越差。

Z-Ordering的基础操作

前面我们说,我们需要按某种方式把这些列的值转化为byte数组,然后按bit位interleaving。interleaving的规则很清楚,但是这里的“按某种方式把这些列的值转化为byte数组”是指的什么呢?能否直接用比如数字或者字符串的二进制表示形式么?答案是不能。这里我们以Java为例,Z-Ordering要求数字和字符串的二进制形式要保留原来的可排序。比如:

-1 的二进制原码是: 10000000 00000000 00000000 00000001 1 的二进制源码是: 00000000 00000000 00000000 00000001

所谓二进制的可排序性是指按从左到右按字面位比较。所以-1的原码二进制是比1的大的。而实际上在java里-1是用补码表示的,是一个字面上更大的值(11111111 11111111 11111111 11111111 )。 所以我们需要对数字的正负性进行处理,保证他的二进制字面比较能和实际的值一致。为了解决这个问题,我们对有符号数需要做三个操作

  1. 统一转化为原码表示
  2. 最高位翻转
  3. 从左边补,补足八个字节

对于utf8编码,只要简单的截断成八个字节即可。

比如int类型的操作实现如下:

public static byte[] toBytes(long val) {
    long temp = val;
    byte[] b = new byte[8];
    for (int i = 7; i > 0; i--) {
        b[i] = (byte) temp;
        temp >>>= 8;
    }
    b[0] = (byte) temp;
    return b;
}
//考虑负数,如果是负数,还原成原码表示,然后直接将第一位翻转,最后padding 成8字节
// 正数,第一位翻转,然后Padding成8字节
public static byte[] intTo8Byte(int a) {
    int temp = a;
    if (a < 0) {
        temp = (~(a - 1))^ (1 << 31);
    }
    temp = temp ^ (1 << 31);
    return paddingTo8Byte(toBytes(temp));
}

接着,当我们获取到多个8字节标的字段时,就可以对他们进行interleaving了:

 //用b 的bpos bit 位,设置a的 apos bit位
public static byte updatePos(byte a, int apos, byte b, int bpos) {
    //将bpos以外的都设置为0
    byte temp = (byte) (b & (1 << (7 - bpos)));
    //把temp bpos位置的值移动到apos

    //小于的话,左移
    if (apos < bpos) {
        temp = (byte) (temp << (bpos - apos));
    }
    //大于,右边移动
    if (apos > bpos) {
        temp = (byte) (temp >> (apos - bpos));
    }
    //把apos以外的都设置为0
    byte atemp = (byte) (a & (1 << (7 - apos)));
    if ((byte) (atemp ^ temp) == 0) {
        return a;
    }
    return (byte) (a ^ (1 << (7 - apos)));
}

//每个属性用8byte表示。但是属性数目不确定。
public static byte[] interleaveMulti8Byte(byte[][] buffer) {
    int attributesNum = buffer.length;
    byte[] result = new byte[8 * attributesNum];

    //结果的第几个byte的第几个位置
    int resBitPos = 0;

    //每个属性总的bit数
    int totalBits = 64;
    //第一层循环移动bit
    for (int bitStep = 0; bitStep < totalBits; bitStep++) {
        //首先获取当前属性在第几个byte(总共八个)
        int tempBytePos = (int) Math.floor(bitStep / 8);
        //获取bitStep在对应属性的byte位的第几个位置
        int tempBitPos = bitStep % 8;

        //获取每个属性的bitStep位置的值
        for (int i = 0; i < attributesNum; i++) {
            int tempResBytePos = (int) Math.floor(resBitPos / 8);
            int tempResBitPos = resBitPos % 8;
            result[tempResBytePos] = updatePos(result[tempResBytePos], tempResBitPos, buffer[i][tempBytePos], tempBitPos);
            //结果bit要不断累加
            resBitPos++;
        }
    }


    return result;
}

更多代码可参考这里ZOrderingBytesUtil

构建索引

现在,利用上面的工具类,我们就可以将他们应用到你的DataFrame里,从而让你的数据包含这个新的列。 值得注意的是,查询的时候用户可能不会写全你索引的字段,但是Z-Ordering编码是要求三个字段都必须在的,这个时候你就可以使用缺失字段的min/max值进行补充。所以我们需要在写入数据的时候,同时将索引字段的min-max值写到schema的metadata里,方便后续改写Query的时候获取到。

具体代码参看 ZOrderingIndexer.scala#L179

现阶段支持 Float/Double/Int/Long/String 五个类型的字段。日期字段后续会加上。

改写查询

用户其实并不知道Z-Ordering字段的存在,所以他还是会正常的写成

select * from delta_mlsql_job 
where status=4 and mlsql_user_id=1

我们的目标是将其修改为:

select * from delta_mlsql_job where status=4 and mlsql_user_id=1
and __mlsql_indexer_zordering_8e8e3948573bfcb5c7267884fa389375 <= 0xE00000000000000000000000000000000000000000048342
and 0xE00000000000000000000000000000000000000000048142 <= __mlsql_indexer_zordering_8e8e3948573bfcb5c7267884fa389375

其中 __mlsql_indexer_zordering_8e8e3948573bfcb5c7267884fa389375 是索引字段名称。 值得注意的是,Z-Ordering核心是缩小的你的查询范围,而不是精准定位记录,所以原始的过滤条件还是需要继续保留的, 我们通过新增一个字段,加强过滤能力。

修改的方法也比较简单,根据原始SQL获取 LogicalPlan,然后修改LogicalPlan即可。具体代码可参考:

ZOrderingIndexer.scala#L23

目前我只实现了Equal表达式,GreatEq/Great和LessEq/Less后续也是按相同的方式来实现。

进阶

其实如果只是做到利用Parquet的ColumnChunk级别的min/max值,效果还是大打折扣的,完全可以做到Parquet File级别的Skip. 因为事先根据Z-Ordering做了排序,所以Parquet File对应的min/max值都不会重叠,这意味着可以直接根据min/max值进行File Skip了。 可能一个几十亿的表,你只要扫描一个文件即可,获得不可思议的性能提升。

那么要做这个过滤,通用的解决方案是在Parquet 目录里维护一个json格式的meta信息,每个Parquet文件对应一个min/max值。如果是delta,可以 直接在delta_log里的json文件中的AddFile对象中添加min/max到stats属性里。

缺点

Z-Ordering的缺点也比较明显,就是一份数据只能有一个Z-Ordering索引(因为数据要求有序), 而如果用户构建该索引的字段过多,则可能会导致过滤效果很差。解决办法是增加数据副本,然后查询时根据需求条件查询不同的数据副本。

Bonus

对于含有日期等分区的数据怎么办?保证每个分区里的数据是是按Z-Ordering索引排序即可。

results matching ""

    No results matching ""