MLSQL插件开发系列2-引擎生命周期扩展点
对应版本 2.1.0-SNAPSHOT
MLSQL生命周期有两个扩展点:
- MLSQLPlatformLifecycle
- MLSQLRuntimeLifecycle
MLSQLPlatformLifecycle
MLSQLPlatformLifecycle对应的接口为:
trait MLSQLPlatformLifecycle {
def beforeRuntime(params: Map[String, String]): Unit
def afterRuntime(runtime: StreamingRuntime, params: Map[String, String]): Unit
def beforeDispatcher(runtime: StreamingRuntime, params: Map[String, String]): Unit
def afterDispatcher(runtime: StreamingRuntime, params: Map[String, String]): Unit
}
可以看到,用户有四个时机可以嵌入,分别是在Runtime启动前,Runtime启动后,以及任务调度前,任务调度后。不过,任务调度前,任务调度后已经是遗留的任务了。
因为在MLSQL设计之初,是准备支持多种不同类型引擎的,比如SparkRuntime,FlinkRuntime。所以这里所谓Runtime,其实就是对应引擎的的Session。那么beforeRuntime就是在构建引擎之前,此时嵌入的插件,可以修改Runtime构建行为。afterRuntime则是构建引擎之后,此时嵌入插件,你可以对Runtime的Session进行一些初始化。
在MLSQL中,比如日志服务支持(LogFileHook),以及插件体系扩展点(PluginHook)都实现了MLSQLPlatformLifecycle来完成的。
自定义扩展可以通过-streaming.platform_hooks
配置。
MLSQLRuntimeLifecycle
MLSQLRuntimeLifecycle 的接口申明如下:
trait MLSQLRuntimeLifecycle {
def beforeRuntimeStarted(params: Map[String, String], conf: SparkConf): Unit
def afterRuntimeStarted(params: Map[String, String], conf: SparkConf, rootSparkSession: SparkSession): Unit
}
该接口的实现类是在对应的Runtime里面被调用的。比如,如果用户使用的SparkRuntime,那么如果实现该接口的类,都是被SparkRuntime调用。目前,MLSQLRuntimeLifecycle仅支持SparkRuntime,因为他直接引入了Spark相关的接口。
beforeRuntimeStarted
可以让你修改SparkConf从而控制构建SparkSession的行为,afterRuntimeStarted
则用户可以获取到Root Session。
在MLSQL中,默认插件安装信息是放到数据湖里的,但是也支持存储到MySQL数据库,而这个支持就是通过MetaStoreService(该类实现了MLSQLRuntimeLifecycle)来完成的,该类会根据参数指定的db.yml文件连接MySQL数据库,并且提供一个实现了和数据湖类似接口的实例操作数据库,从而能够完成透明替换数据湖存储插件安装信息。
实际很多需求都可以通过实现MLSQLRuntimeLifecycle
来完成。比如,很多用户希望能够使用MLSQL语言替换Java/Scala/PySpark去写Spark程序,除此之外,他希望引擎的提交方式要和传统的Spark完全类似,执行完成后就自动退出,而非现在作为标准Web服务对外提供服务。那么我们就可以实现一个类,在afterRuntimeStarted
执行实际的MLSQL脚本,然后启动时,将streaming.rest
设置为false即可。
自定义扩展可以通过-streaming.runtime_hooks
配置。