Spark JDBC加载慢怎么办

Spark可以读取支持JDBC协议的大部分数据源,但实际使用下来,用户会一头包,主要是三个问题:

  1. 计算特别慢
  2. Spark似乎特别容易挂掉
  3. 数据源负载重

第三个应该最好理解,Spark读的太猛了,数据源自然压力大。1,2 的原因则是你没有充分发挥Spark的潜力。

今天我们重点来看看,为啥正常读取JDBC时,没有办法发挥Spark的潜力。

首先,大家先思考一个问题,如果让你写一个程序,读取MySQL的数据,你怎么写?

典型的如下:

val driver = options("driver")
    val url = options("url")
    Class.forName(driver)
    val connection = java.sql.DriverManager.getConnection(url, options("user"), options("password"))
    val stat = connection.prepareStatement(sql)
    val rs = stat.executeQuery()
    while(rs.next){
      val row = rs.next()
    }
    rs.close
    connection.close

对于一个几千万甚至上亿条数据的大表,大家不妨看看遍历一遍需要多久。 这里问题的根源是,你只是一个线程去读,从头读到尾,那肯定很慢。

如果想加快速度,大家自然会想到多线程,如果用多线程大家应该怎么读呢?多线程无非就是分而治之,每个线程读一部分数据。比如假设有id为0到100的数据,id不连续,并且实际上有50条记录,如果我们希望分而治之,那么最好是五个线程,每个线程读取10条数据。但是肯定是做不到这么精确的。最简单的办法是,产生五条如下的SQL,

select * from xxx where id<20;
select * from xxx where id>=20 and id <40;
select * from xxx where id>=40 and id <60;
select * from xxx where id>=60 and id <80;
select * from xxx where id>=80;

因为id中间有空隙,所以每条SQL实际拿到的数据并不一样。但没关系,通过五个线程执行这五条SQL,我们肯定可以通过更少的时间获取到全量数据。

现在回过头来,我们想想Spark,Spark快很大一部分原因其实是分布式多线程的执行方式,这样可以充分利用多核算力。但是面对一个JDBC接口,Spark 并不知道如何并行的去拉取一个表的数据,所以就傻傻的用一个线程去拉取数据,一个线程拉的多了,GC压力就大,不但慢,还容易把这个线程所在的进程搞挂。这也是前面提到的1,2的原因。

那如果我们想让数据拉取变快,狠心就是要让Spark并行化拉取数据。这个时候你需要提供一个类似我们前面提到的,可以切分查询的字段给Spark。那么怎么给Spark呢?Spark和你约定了四个字段:

  1. partitionColumn 按哪个列进行分区
  2. lowerBound, upperBound, 分区字段的最小值,最大值(可以使用directQuery获取)
  3. numPartitions 分区数目。一般8个线程比较合适。

首先,你得告诉我,哪个字段是可以切分的,你可以通过partitionColumn告诉我。接着该怎么切分呢,如果像上面这样一组一组的告诉我,比如这样(...,20),[20,40),[40,60),[60,80)....[80,...) 。少还可以,如果面对上百组的情况,你肯定会疯掉。而且这样也不好后续做修改,比如我想搞多点搞少点都会修改的很麻烦。所以Spark权衡后,最后给的设计是这样的,你告诉第一组的最大值,最后一组的最小值,然后告诉我到底要几组,就可以了。比如上面的例子,第一组的最小值是20,最后一组的最大值是80,然后总共5组,这样Spark就知道20-80之间还要再分三组。

所以此时:

  1. lowerBound=20
  2. upperBound=80
  3. numPartitions=5

系统会自动产生(...,20),[20,40)....[80,...) 这样的序列,然后每个小组产生一条SQL。这样就可以并行拉去数据了。

因为我们希望尽可能的根据这个切分,能切分均匀,所以最好的字段肯定是自增字段。

理解了上面的问题之后,大家普遍还会遇到三个疑问:

第一个,那如果我没有自增字段该怎么办呢?甚至没有数字字段。能不能用比如oracle的虚拟字段rownum,或者利用mysql的虚拟行号字段? 其实是可以的,但是可能会对数据源产生比较大的压力,比如MySQL如果使用虚拟行号,会产生巨大的临时表。

第二个,是只能数字字段么?日期行不行。答案是可以。目前分区字段(以spark 2.4.3为例)支持的类型有三种:

  1. 数字类型(统一会转换为long类型)
  2. DateType, 对应的文本格式为yyyy-MM-dd
  3. TimestampType 对应的文本格式为 yyyy-MM-dd HH:mm:ss

第三个是,我该如何知道lowerBound,upperBound的值呢。其实很简单,你获取这个字段的min/max值即可。 如果你的数据新增量不大,你不用担心最后[max,) 这个分区的数据太多。

另外,如果如果还有疑问,可以参考加载JDBC(如MySQL,Oracle)数据常见困惑

MLSQL作为一个优秀的面向大数据和AI的语言和引擎,其实针对MySQL,也给出了解决方案:如何开箱即用的分析你的数据库数据(JDBC索引技术预览)

results matching ""

    No results matching ""