MLSQL插件开发系列2-引擎生命周期扩展点

对应版本 2.1.0-SNAPSHOT

MLSQL生命周期有两个扩展点:

  1. MLSQLPlatformLifecycle
  2. MLSQLRuntimeLifecycle

MLSQLPlatformLifecycle

MLSQLPlatformLifecycle对应的接口为:

trait MLSQLPlatformLifecycle {

  def beforeRuntime(params: Map[String, String]): Unit

  def afterRuntime(runtime: StreamingRuntime, params: Map[String, String]): Unit

  def beforeDispatcher(runtime: StreamingRuntime, params: Map[String, String]): Unit

  def afterDispatcher(runtime: StreamingRuntime, params: Map[String, String]): Unit
}

可以看到,用户有四个时机可以嵌入,分别是在Runtime启动前,Runtime启动后,以及任务调度前,任务调度后。不过,任务调度前,任务调度后已经是遗留的任务了。

因为在MLSQL设计之初,是准备支持多种不同类型引擎的,比如SparkRuntime,FlinkRuntime。所以这里所谓Runtime,其实就是对应引擎的的Session。那么beforeRuntime就是在构建引擎之前,此时嵌入的插件,可以修改Runtime构建行为。afterRuntime则是构建引擎之后,此时嵌入插件,你可以对Runtime的Session进行一些初始化。

在MLSQL中,比如日志服务支持(LogFileHook),以及插件体系扩展点(PluginHook)都实现了MLSQLPlatformLifecycle来完成的。

自定义扩展可以通过-streaming.platform_hooks 配置。

MLSQLRuntimeLifecycle

MLSQLRuntimeLifecycle 的接口申明如下:

trait MLSQLRuntimeLifecycle {
  def beforeRuntimeStarted(params: Map[String, String], conf: SparkConf): Unit

  def afterRuntimeStarted(params: Map[String, String], conf: SparkConf, rootSparkSession: SparkSession): Unit

}

该接口的实现类是在对应的Runtime里面被调用的。比如,如果用户使用的SparkRuntime,那么如果实现该接口的类,都是被SparkRuntime调用。目前,MLSQLRuntimeLifecycle仅支持SparkRuntime,因为他直接引入了Spark相关的接口。 beforeRuntimeStarted可以让你修改SparkConf从而控制构建SparkSession的行为,afterRuntimeStarted则用户可以获取到Root Session。

在MLSQL中,默认插件安装信息是放到数据湖里的,但是也支持存储到MySQL数据库,而这个支持就是通过MetaStoreService(该类实现了MLSQLRuntimeLifecycle)来完成的,该类会根据参数指定的db.yml文件连接MySQL数据库,并且提供一个实现了和数据湖类似接口的实例操作数据库,从而能够完成透明替换数据湖存储插件安装信息。

实际很多需求都可以通过实现MLSQLRuntimeLifecycle来完成。比如,很多用户希望能够使用MLSQL语言替换Java/Scala/PySpark去写Spark程序,除此之外,他希望引擎的提交方式要和传统的Spark完全类似,执行完成后就自动退出,而非现在作为标准Web服务对外提供服务。那么我们就可以实现一个类,在afterRuntimeStarted执行实际的MLSQL脚本,然后启动时,将streaming.rest设置为false即可。

自定义扩展可以通过-streaming.runtime_hooks 配置。

results matching ""

    No results matching ""