MLSQL源码阅读小指南
如何在Intellij Idea中设置开发环境,可以参考这篇文章:MLSQL Engine开发环境设置 今天我们主要介绍下如何查看源码。
首先,MLSQL Engine本质上是一个web服务,对应的web框架为ServiceFramework,该框架是我早年开发的一个一站式Web框架。Http对应的入口类为streaming.rest.RestController
,最核心的接口为/run/script
,该接口实现了脚本的执行功能。该接口可通过executeMode
参数实现不同业务应用逻辑并且用户可扩展。
executeMode默认有两个,query,analyze。 Query是提供正儿八经的脚本执行,analyze则仅仅是返回parse结果,方便用户做一些有意义的Query解析,完成更高阶的功能,其他都通过插件来进行扩展,比如external/python-controller
允许直接传递python脚本给引擎执行,external/mlsql-autosuggest
则提供代码提示的功能。
/run/script 接口逻辑
该接口第一步是权限检查:
accessAuth 的逻辑比较简单,如果用户配置了spark.mlsql.auth.access_token
,那么就会判断用户传递的access_token
是不是和配置的一致。 其次会检查用户有没有自定义接口权限(通过spark.mlsql.auth.custom
),该类会把所有请求参数作为一个map传给用户的实现类。
第二步是创建一个context,因为代码层次足够深,如果不使用ThreadLocal方式传递参数,那么参数传递会是个灾难。该Context持有用户当前的SparkSession实例以及请求参数等。
val context = createScriptSQLExecListener(sparkSession, jobInfo.groupId)
第三步是根据executeMode
选择执行具体的计算逻辑,本文以query为例子进行介绍。
query会区分异步执行还是同步执行。异步执行就是接到任务后直接返回,然后单独开一个线程执行实际的任务。同步的话就是一直阻塞请求。实际执行逻辑是由tech.mlsql.job.JobManager
执行,他分别提供了run和asyncRun两个方法。JobManager还提供了一些监听器追踪脚本执行进度,以及任务超时控制等功能。
JobManager是通过ScriptSQLExec来执行脚本的解析,翻译,以及最后执行的。所以ScriptSQLExec也是整个MLSQL最核心的类。
ScriptSQLExec解析
ScriptSQLExec全类名是streaming.dsl.ScriptSQLExec
,通过ThreadLocal持有MLSQLExecuteContext
,该Context是前面我们提及的context,包含有SparkSession以及请求参数。
该类的入口是parse
方法,使用了antlrv4的listener模式来解析SQL。先看签名:
input是MLSQL语句,listener则是我们实现的antlrv4的listener,listener的机制是一个完整语句被解析后,就会调用一个回调函数并且给一个语句的context给我们,我们就能拿到这个语句的各种元素。skip*系列参数,则是分别控制是否支持include语法,是否进行权限控制,是否执行真正的物理计划,是否执行提前执行语法校验。
parse 实际上调用了多个不同类型的Listener来完成任务的。 parse 逻辑流程如下:
- 填充预先设置的一些命令行
- 如果支持include,那么会调用
PreProcessIncludeListener
进行Include语法预处理。逻辑比较简单,就是拿include的实际文本替换include语句,最终得到一个不包含include语法的文本。最高嵌套层次为10级,避免循环嵌套。 - 接着调用
PreProcessListener
完成命令行语法转化为原生MLSQL语法,以及evaluate部分set值。默认set语法是解析器,也就是现在这个阶段完成的。我们也可以通过参数指定让其在运行期完成set语法的赋值。 - 如果支持语法校验,会先进行一遍语法解析
- 如果支持权限校验,那么会根据启动参数
spark.mlsql.auth.implClass
或者请求参数context.__auth_client__
来初始化auth client,并且将语法解析得到的所有表提交给该客户端执行校验(解析时权限校验) - 如果支持物理执行,那么则执行实际的代码逻辑。
所以实际上parse执行了如上六个逻辑。
在物理执行计划阶段,对应的Listener是ScriptSQLExecListener
,Antlrv4会每次在得到一个完整语句的时候,回调exitSql
函数。所以这里的逻辑很简单,拿到对应的语句,根据前缀判断具体的翻译逻辑:
我们针对每种类型的语句,都有对应的 *Adaptor来进行翻译以及实际的执行。比如实例中我们看到了LoadAdaptor
以及SelectAdaptor
等。
通常一个Adaptor有两个部分构成:analyze,parse
通过analyze得到一个类型表示一个语句,然后通过parse将其翻译成底层Spark可以执行的逻辑。
在MLSQL中,扩展性最强的是TrainAdaptor
,下面的例子都是通过TrainAdaptor来完成的:
- 命令行
- run/train/predict开头的语法
项目目录的一些小tips
MLSQL底层的执行引擎是Spark,这意味着MLSQL有个问题绕不过去,那就是需要兼容多个Spark版本,这是一件比较困难的事情。MLSQL的解决方案是将Spark变化的API统一维护在streamingpro-spark-x.x.x-adaptor模块中,上层使用经过adaptor适配的API。目前MLSQL支持2.4.x,3.0.x。 2.3.0将很快完全移除。
streamingpro-mlsql是主模块,也就是最高层的模块。依赖关系如下:
streamingpro-mlsql => streamingpro-core/streamingpro-dsl =>
streamingpro-spark-x.x.x-adaptor => streamingpro-spark-common/streamingpro-commons =>
如果要实现Flink支持,基本上可能需要重新实现一遍,但基本可以做到用户层面不受影响。
external
目录基本包含了大部分扩展功能,比如 if/else语法的expression编译器,各种大数据索引共鞥你等都是在这里完成的。