玩转MLSQL Python插件

MLSQL支持Python插件,我们写的python代码用户可以指定是在Driver侧还是Executor运行。通过PyJava API,用户可以将更加复杂的 计算逻辑或者算法训练发送给Ray执行,此时,在Driver端或者Executor端运行Python程序,本质是一个Ray client.

今天在这篇文章,我们会介绍如何配置Python环境,以及一些常见的问题。

环境依赖

在运行MLSQL Driver(Executor节点可选)的节点上,需要有Python环境。如果你使用yarn,推荐使用Conda管理Python环境。如果你使用K8s,可以直接使用镜像管理。

Python环境最好是3.6版本,

针对Ray 0.8.0, 并且请安装如下依赖:

pip install Cython
pip install pyarrow==0.10.0
pip install ray==0.8.0
pip install aiohttp psutil setproctitle grpcio pandas xlsxwriter
pip install watchdog requests click uuid sfcli
pip install pyjava==0.2.8.3

针对Ray 1.3.0(推荐), 并且请安装如下依赖:

pip install Cython
pip install ray==1.3.0
pip install aiohttp psutil setproctitle grpcio pandas xlsxwriter
pip install watchdog requests click uuid sfcli
pip install pyjava>=0.2.8.8

如果pyjava>=0.2.8.8安装出错,一般而言是你使用了pip 阿里云镜像更新延迟导致的。你可以临时去掉镜像来安装。

PyJava和Ray版本匹配

  1. PyJava 0.2.8.3 只支持 Ray 0.8.0.
  2. PyJava 0.2.8.5 只支持Ray 1.0之后的版本。

Ray 0.8.0

对于Ray 0.8.0 版本,如果你要使用Ray做计算,请确保Driver节点(Executor节点可选)按如下方式启动ray worker:

ray start --address=<address> --num-cpus=0 --num-cpus=0

其中address地址为Ray集群地址,类似123.45.67.89:6379这样。同时我们将cpu,gpus等资源设置为0. 启动一个没有资源的ray worker ,是因为我们需要通过ray worker提交任务。

Ray 1.2.0

Ray 1.2.0版本不再需要Driver(Executor节点可选)上启动ray worker。但需要在ray header节点启动一个server:

python -m ray.util.client.server [--host host_ip] [--port port] [--redis-address address] [--redis-password password]

在MLSQL中,RayContext.connect(globals(),[Address])的第二个参数,需要配置这个新地址。

Ray 1.3.0((推荐))

无需做任何配置。只需要在MLSQL中按如下方式连接:

"RayContext.connect(globals(),<head_node_host>:10001)"

MLSQL Python插件基础知识

MLSQL 通过插件 Ray来支持Python,可以实现使用Python对SQL表进行数据处理亦或是AI建模。大家经常看到的代码如下:


!python env "PYTHON_ENV=:";
!python conf "runIn=driver";
!python conf "schema=st(field(title,string),field(body,string))";
!python conf "dataMode=data";

!ray on data '''

import ray
from pyjava.api.mlsql import RayContext
import numpy as np;

ray_context = RayContext.connect(globals(),"auto")

def echo(row):
    row1 = {}
    row1["title"]="jackm"
    row1["body"]= row["body"]
    return row1

ray_context.foreach(echo)

''' named newdata;

select * from newdata as output;

实际上, !ray on data这个不过是个语法糖,系统会自动将其转换成如下代码执行:

run command as Ray.`` where 
inputTable="vega_datasets" and 
outputTable="newdata" and
code='''

import ray
from pyjava.api.mlsql import RayContext
import numpy as np;

ray_context = RayContext.connect(globals(),"auto")

def echo(row):
    row1 = {}
    row1["title"]="jackm"
    row1["body"]= row["body"]
    return row1

ray_context.foreach(echo)
''';

这里的Ray就是我们的一个插件。

我们可以通过!python命令对我们即将执行的Python代码做一些配置,这包括:

  1. 通过 !python env "PYTHON_ENV=source /Users/allwefantasy/opt/anaconda3/bin/activate dev";来配置python环境
  2. 通过 !python conf "schema=st(field(k1,string),field(k2,string))";来配置Python的输出Schema
  3. 通过!python conf "runIn=driver";来配置Python Client代码是跑在Executor上还是Driver上(另外一个参数是executor)。
  4. 通过!python conf "dataMode=data"; 来决定是不是会使用Ray的一些高阶API。另外一个参数是"model".如果大家不适用Python做分布式的ETL,那么使用"model"类型即可,系统会将数据收集到一个python进程进行处理。

更多信息可以参考MLSQL Python支持

如何在python中动态一些参数

我们看下如下一个场景:

select "127.0.0.1" as rayAddress as dynamicConfTable;
set rayAddress=`select * from dynamicConfTable` where type="sql" and mode="runtime";

set rayAddress=``
run command as Ray.`` where 
inputTable="vega_datasets" and 
outputTable="result" and
code='''

from pyjava.api.mlsql import PythonContext,RayContext

ray_context = RayContext.connect(globals(),"${rayAddress}")
# 处理数据
items = ray_context.collect()

context.build_result([{"k1":"rayAddress","k2":"123"}])

''';

这里,dynamicConfTable可能是动态生成的一些配置选项,我们希望通过这个表里配置选项去控制我们python代码的行为,一个典型的例子是我们ray的地址可能根据程序需要是动态变化的。这个时候试图使用类型为sqlset语法动态设置ray地址,然后在python代码中获取,最后会发现在python中获取的地址为空。这是因为,使用字面量${variable}的方式,是编译时渲染的,而set语法在mode="runtime"时,只有在运行时才会执行,编译时是不会进行求值的。那将mode设置为compile呢,这个时候会有另外一个问题,因为rayAddress变量里引用了dynamicConfTable,而dynamicConfTable只有在运行时才会存在,系统在compile 时去求rayAddress的值,就会出现表不存在的问题。

那这个问题该如何解决呢?Ray插件提供了一个新的参数叫confTable,允许用户传递只有两个字段的表进去(等价于kv类型)。然后在python中通过context.conf[键值]来获取。大家看下面的代码例子:

load delta.`python_data.vega_datasets` 
as vega_datasets;

select "rayAddress" as k,"127.0.0.1" as v as pythonConf;

!python env "PYTHON_ENV=source /Users/allwefantasy/opt/anaconda3/bin/activate dev";
!python conf "schema=st(field(k1,string),field(k2,string))";

run command as Ray.`` where 
inputTable="vega_datasets" and 
outputTable="result" and
confTable="pythonConf" and
code='''

from pyjava.api.mlsql import PythonContext,RayContext

ray_context = RayContext.connect(globals(),None)
# 处理数据
items = ray_context.collect()
# 获取配置表的参数
v=context.conf["rayAddress"]
context.build_result([{"k1":"rayAddress","k2":v}])

''';

select * from result as output;

我们通过confTable可以指定一张只有两个字段的表,然后接着就可以在Python中通过contex.conf获取这张表的内容。这样我们就可以动态控制python的代码了。

results matching ""

    No results matching ""