MLSQL插件开发系列3-插件扩展点
对应版本 2.1.0-SNAPSHOT
MLSQL PluginHook支持四种插件:
- APP
- DS
- ET
- SCRIPT
APP插件扩展点,本质上是执行一些自定义代码逻辑。这意味这在引擎启动后,你可以完成任何你想要的代码。比如初始化其他三类插件都是可以的。所以APP插件是最灵活的。
DS 则是数据源插件,你可以实现一些官方没有实现的数据源,从而使得MLSQL能够操作他们。 ET则可以扩展run/train语法。 Script插件是允许你把MLSQL脚本打包成jar包实现部署和安装。
APP插件可以通过-streaming.plugin.clzznames
指定亦或是网络安装。可参考MLSQL插件安装
其他插件可以通过包装成APP插件的方式注册亦或是通过!plugin
来安装。
插件举例
mlsql-sql-profiler 是MLSQL的一个内置插件,该插件是一个APP插件,但是内部其实包含了很多ET插件,Controller扩展点等。
我们可以看看入口实现:
class ProfilerApp extends tech.mlsql.app.App with VersionCompatibility {
这里,你需要实现App和VersionCompatibility两个接口,分别要实现run
以及supportedVersions
两个函数。
其中run
是在Runtime启动回调执行。所以相当于提供了一个钩子给你。在ProfilerApp中,我们注册很多ET组件:
override def run(args: Seq[String]): Unit = {
AppRuntimeStore.store.registerController("genSQL", classOf[GenSQLController].getName)
AppRuntimeStore.store.registerController("indexRewrite", classOf[IndexerRewriteController].getName)
ETRegister.register(ProfilerApp.MODULE_NAME, classOf[ProfilerCommand].getName)
ETRegister.register("ZOrdering", classOf[ZOrdering].getName)
ETRegister.register("CubeIndexerBuilder", classOf[CubeIndexerBuilder].getName)
ETRegister.register("CubeIndexerQuery", classOf[CubeIndexerQuery].getName)
CommandCollection.refreshCommandMapping(Map(ProfilerApp.COMMAND_NAME -> ProfilerApp.MODULE_NAME))
AppRuntimeStore.store.registerResultRender("IndexerPlugin", classOf[IndexerPlugin].getName)
}
从名字可以看到,我们对于我们实现的一些插件或者扩展点,系统提供了注册类。AppRuntimeStore.store.registerController
允许你在/run/script
接口中注册你自己的逻辑,行为通过executeMode
参数控制.
ETRegister则是注册ET插件了。比如这里我们注册了ZOrdering
插件,现在,我们可以在代码里这么写:
train dataTable as ZOrdering.``
where indexFields="status,mlsql_user_id,created_at"
and fileNum="5"
as newDF;
当系统遇到这段代码的时候,就会去调用classOf[ZOrdering]
。
大家可能还注意到 CommandCollection.refreshCommandMapping(Map(ProfilerApp.COMMAND_NAME -> ProfilerApp.MODULE_NAME))
,这个代码是用来干什么的呢?其实就是允许你把run/train语句转化成一个命令。比如对于上面的ZOrdering插件,假设我们注册为一个命令:
CommandCollection.refreshCommandMapping(Map("zorder" -> "ZOrdering"))
那么我们就可以在MLSQL中这么使用:
!zorder _ -input dataTable -indexFields "status,mlsql_user_id,created_at";
最后还有一个代码:
AppRuntimeStore.store.registerResultRender
registerResultRender
是用来干嘛的呢?他允许你修改最后的逻辑执行计划。你不如你写了如下一条SQL语句:
select 1 as a as b;
执行的时候,系统会获得一个LogialPlan,这个时候你可以在真正执行这个LP之前,修改LP,从而实现一些改写行为。比如在示例中IndexerPlugin
,他会尝试每次执行时,都尝试进行ZOrdering改写,优化执行性能。
class IndexerPlugin extends ResultRender {
override def call(d: ResultResp): ResultResp = {
val params = JSONTool.parseJson[Map[String, String]](ScriptSQLExec.context().userDefinedParam.getOrElse("__PARAMS__", "{}"))
if (!params.getOrElse("enableQueryWithIndexer", "false").toBoolean) {
return d
}
val consoleUrl = ScriptSQLExec.context().userDefinedParam.getOrElse("__default__console_url__", "")
val auth_secret = ScriptSQLExec.context().userDefinedParam.getOrElse("__auth_secret__", "")
val metaClient = new RestIndexerMeta(consoleUrl, auth_secret)
val indexer = new LinearTryIndexerSelector(Seq(new ZOrderingIndexer), metaClient)
val finalLP = indexer.rewrite(d.df.queryExecution.analyzed, Map())
val sparkSession = ScriptSQLExec.context().execListener.sparkSession
val ds = DataSetHelper.create(sparkSession, finalLP)
ResultResp(ds, d.name)
}
}
对应的,ET,DS等如何开发,我们在后续文章中会继续讲解。在这篇文章里,我们可以看到,我们开发的这些插件是如何通过APP插件注册进去的。