MLSQL插件开发系列3-插件扩展点

对应版本 2.1.0-SNAPSHOT

MLSQL PluginHook支持四种插件:

  1. APP
  2. DS
  3. ET
  4. SCRIPT

APP插件扩展点,本质上是执行一些自定义代码逻辑。这意味这在引擎启动后,你可以完成任何你想要的代码。比如初始化其他三类插件都是可以的。所以APP插件是最灵活的。

DS 则是数据源插件,你可以实现一些官方没有实现的数据源,从而使得MLSQL能够操作他们。 ET则可以扩展run/train语法。 Script插件是允许你把MLSQL脚本打包成jar包实现部署和安装。

APP插件可以通过-streaming.plugin.clzznames指定亦或是网络安装。可参考MLSQL插件安装

其他插件可以通过包装成APP插件的方式注册亦或是通过!plugin来安装。

插件举例

mlsql-sql-profiler 是MLSQL的一个内置插件,该插件是一个APP插件,但是内部其实包含了很多ET插件,Controller扩展点等。

我们可以看看入口实现:

class ProfilerApp extends tech.mlsql.app.App with VersionCompatibility {

这里,你需要实现App和VersionCompatibility两个接口,分别要实现run以及supportedVersions两个函数。 其中run是在Runtime启动回调执行。所以相当于提供了一个钩子给你。在ProfilerApp中,我们注册很多ET组件:

override def run(args: Seq[String]): Unit = {
    AppRuntimeStore.store.registerController("genSQL", classOf[GenSQLController].getName)
    AppRuntimeStore.store.registerController("indexRewrite", classOf[IndexerRewriteController].getName)
    ETRegister.register(ProfilerApp.MODULE_NAME, classOf[ProfilerCommand].getName)
    ETRegister.register("ZOrdering", classOf[ZOrdering].getName)
    ETRegister.register("CubeIndexerBuilder", classOf[CubeIndexerBuilder].getName)
    ETRegister.register("CubeIndexerQuery", classOf[CubeIndexerQuery].getName)
    CommandCollection.refreshCommandMapping(Map(ProfilerApp.COMMAND_NAME -> ProfilerApp.MODULE_NAME))
    AppRuntimeStore.store.registerResultRender("IndexerPlugin", classOf[IndexerPlugin].getName)
  }

从名字可以看到,我们对于我们实现的一些插件或者扩展点,系统提供了注册类。AppRuntimeStore.store.registerController允许你在/run/script接口中注册你自己的逻辑,行为通过executeMode参数控制.

ETRegister则是注册ET插件了。比如这里我们注册了ZOrdering插件,现在,我们可以在代码里这么写:

train dataTable as ZOrdering.`` 
where indexFields="status,mlsql_user_id,created_at"
and fileNum="5"
as newDF;

当系统遇到这段代码的时候,就会去调用classOf[ZOrdering]

大家可能还注意到 CommandCollection.refreshCommandMapping(Map(ProfilerApp.COMMAND_NAME -> ProfilerApp.MODULE_NAME)),这个代码是用来干什么的呢?其实就是允许你把run/train语句转化成一个命令。比如对于上面的ZOrdering插件,假设我们注册为一个命令:

CommandCollection.refreshCommandMapping(Map("zorder" -> "ZOrdering"))

那么我们就可以在MLSQL中这么使用:

!zorder _ -input dataTable -indexFields "status,mlsql_user_id,created_at";

最后还有一个代码:

AppRuntimeStore.store.registerResultRender

registerResultRender 是用来干嘛的呢?他允许你修改最后的逻辑执行计划。你不如你写了如下一条SQL语句:

select 1 as a as b;

执行的时候,系统会获得一个LogialPlan,这个时候你可以在真正执行这个LP之前,修改LP,从而实现一些改写行为。比如在示例中IndexerPlugin,他会尝试每次执行时,都尝试进行ZOrdering改写,优化执行性能。

class IndexerPlugin extends ResultRender {
  override def call(d: ResultResp): ResultResp = {
    val params = JSONTool.parseJson[Map[String, String]](ScriptSQLExec.context().userDefinedParam.getOrElse("__PARAMS__", "{}"))
    if (!params.getOrElse("enableQueryWithIndexer", "false").toBoolean) {
      return d
    }
    val consoleUrl = ScriptSQLExec.context().userDefinedParam.getOrElse("__default__console_url__", "")
    val auth_secret = ScriptSQLExec.context().userDefinedParam.getOrElse("__auth_secret__", "")
    val metaClient = new RestIndexerMeta(consoleUrl, auth_secret)
    val indexer = new LinearTryIndexerSelector(Seq(new ZOrderingIndexer), metaClient)
    val finalLP = indexer.rewrite(d.df.queryExecution.analyzed, Map())
    val sparkSession = ScriptSQLExec.context().execListener.sparkSession
    val ds = DataSetHelper.create(sparkSession, finalLP)
    ResultResp(ds, d.name)
  }
}

对应的,ET,DS等如何开发,我们在后续文章中会继续讲解。在这篇文章里,我们可以看到,我们开发的这些插件是如何通过APP插件注册进去的。

results matching ""

    No results matching ""