玩转MLSQL Python插件
MLSQL支持Python插件,我们写的python代码用户可以指定是在Driver侧还是Executor运行。通过PyJava API,用户可以将更加复杂的 计算逻辑或者算法训练发送给Ray执行,此时,在Driver端或者Executor端运行Python程序,本质是一个Ray client.
今天在这篇文章,我们会介绍如何配置Python环境,以及一些常见的问题。
环境依赖
在运行MLSQL Driver(Executor节点可选)的节点上,需要有Python环境。如果你使用yarn,推荐使用Conda管理Python环境。如果你使用K8s,可以直接使用镜像管理。
Python环境最好是3.6版本,
针对Ray 0.8.0, 并且请安装如下依赖:
pip install Cython
pip install pyarrow==0.10.0
pip install ray==0.8.0
pip install aiohttp psutil setproctitle grpcio pandas xlsxwriter
pip install watchdog requests click uuid sfcli
pip install pyjava==0.2.8.3
针对Ray 1.3.0(推荐), 并且请安装如下依赖:
pip install Cython
pip install ray==1.3.0
pip install aiohttp psutil setproctitle grpcio pandas xlsxwriter
pip install watchdog requests click uuid sfcli
pip install pyjava>=0.2.8.8
如果pyjava>=0.2.8.8安装出错,一般而言是你使用了pip 阿里云镜像更新延迟导致的。你可以临时去掉镜像来安装。
PyJava和Ray版本匹配
- PyJava 0.2.8.3 只支持 Ray 0.8.0.
- PyJava 0.2.8.5 只支持Ray 1.0之后的版本。
Ray 0.8.0
对于Ray 0.8.0 版本,如果你要使用Ray做计算,请确保Driver节点(Executor节点可选)按如下方式启动ray worker:
ray start --address=<address> --num-cpus=0 --num-cpus=0
其中address地址为Ray集群地址,类似123.45.67.89:6379这样。同时我们将cpu,gpus等资源设置为0. 启动一个没有资源的ray worker ,是因为我们需要通过ray worker提交任务。
Ray 1.2.0
Ray 1.2.0版本不再需要Driver(Executor节点可选)上启动ray worker。但需要在ray header节点启动一个server:
python -m ray.util.client.server [--host host_ip] [--port port] [--redis-address address] [--redis-password password]
在MLSQL中,RayContext.connect(globals(),[Address])
的第二个参数,需要配置这个新地址。
Ray 1.3.0((推荐))
无需做任何配置。只需要在MLSQL中按如下方式连接:
"RayContext.connect(globals(),<head_node_host>:10001)"
MLSQL Python插件基础知识
MLSQL 通过插件 Ray来支持Python,可以实现使用Python对SQL表进行数据处理亦或是AI建模。大家经常看到的代码如下:
!python env "PYTHON_ENV=:";
!python conf "runIn=driver";
!python conf "schema=st(field(title,string),field(body,string))";
!python conf "dataMode=data";
!ray on data '''
import ray
from pyjava.api.mlsql import RayContext
import numpy as np;
ray_context = RayContext.connect(globals(),"auto")
def echo(row):
row1 = {}
row1["title"]="jackm"
row1["body"]= row["body"]
return row1
ray_context.foreach(echo)
''' named newdata;
select * from newdata as output;
实际上, !ray on data
这个不过是个语法糖,系统会自动将其转换成如下代码执行:
run command as Ray.`` where
inputTable="vega_datasets" and
outputTable="newdata" and
code='''
import ray
from pyjava.api.mlsql import RayContext
import numpy as np;
ray_context = RayContext.connect(globals(),"auto")
def echo(row):
row1 = {}
row1["title"]="jackm"
row1["body"]= row["body"]
return row1
ray_context.foreach(echo)
''';
这里的Ray就是我们的一个插件。
我们可以通过!python命令对我们即将执行的Python代码做一些配置,这包括:
- 通过
!python env "PYTHON_ENV=source /Users/allwefantasy/opt/anaconda3/bin/activate dev";
来配置python环境 - 通过
!python conf "schema=st(field(k1,string),field(k2,string))";
来配置Python的输出Schema - 通过
!python conf "runIn=driver";
来配置Python Client代码是跑在Executor上还是Driver上(另外一个参数是executor
)。 - 通过
!python conf "dataMode=data";
来决定是不是会使用Ray的一些高阶API。另外一个参数是"model".如果大家不适用Python做分布式的ETL,那么使用"model"类型即可,系统会将数据收集到一个python进程进行处理。
更多信息可以参考MLSQL Python支持。
如何在python中动态一些参数
我们看下如下一个场景:
select "127.0.0.1" as rayAddress as dynamicConfTable;
set rayAddress=`select * from dynamicConfTable` where type="sql" and mode="runtime";
set rayAddress=``
run command as Ray.`` where
inputTable="vega_datasets" and
outputTable="result" and
code='''
from pyjava.api.mlsql import PythonContext,RayContext
ray_context = RayContext.connect(globals(),"${rayAddress}")
# 处理数据
items = ray_context.collect()
context.build_result([{"k1":"rayAddress","k2":"123"}])
''';
这里,dynamicConfTable可能是动态生成的一些配置选项,我们希望通过这个表里配置选项去控制我们python代码的行为,一个典型的例子是我们ray的地址可能根据程序需要是动态变化的。这个时候试图使用类型为sql
的set
语法动态设置ray地址,然后在python代码中获取,最后会发现在python中获取的地址为空。这是因为,使用字面量${variable}
的方式,是编译时渲染的,而set语法在mode="runtime"时,只有在运行时才会执行,编译时是不会进行求值的。那将mode设置为compile
呢,这个时候会有另外一个问题,因为rayAddress变量里引用了dynamicConfTable,而dynamicConfTable只有在运行时才会存在,系统在compile 时去求rayAddress的值,就会出现表不存在的问题。
那这个问题该如何解决呢?Ray插件提供了一个新的参数叫confTable
,允许用户传递只有两个字段的表进去(等价于kv类型)。然后在python中通过context.conf[键值]
来获取。大家看下面的代码例子:
load delta.`python_data.vega_datasets`
as vega_datasets;
select "rayAddress" as k,"127.0.0.1" as v as pythonConf;
!python env "PYTHON_ENV=source /Users/allwefantasy/opt/anaconda3/bin/activate dev";
!python conf "schema=st(field(k1,string),field(k2,string))";
run command as Ray.`` where
inputTable="vega_datasets" and
outputTable="result" and
confTable="pythonConf" and
code='''
from pyjava.api.mlsql import PythonContext,RayContext
ray_context = RayContext.connect(globals(),None)
# 处理数据
items = ray_context.collect()
# 获取配置表的参数
v=context.conf["rayAddress"]
context.build_result([{"k1":"rayAddress","k2":v}])
''';
select * from result as output;
我们通过confTable可以指定一张只有两个字段的表,然后接着就可以在Python中通过contex.conf
获取这张表的内容。这样我们就可以动态控制python的代码了。