Ray Actor 异常退出问题背景于复现流程

背景

# 获取一个worker pool
udfMaster = ray.get_actor("model_predict")

# 从worker pool 里获取一个worker,这个worker也是一个actor
worker = ray.get(udfMaster.get.remote())

# 把用完的worker归还到worker pool里
udfMaster.give_back.remote(worker)

最后一句在特殊环境中(pyjava)环境中会出现错误,导致worker actor异常退出。

复现流程

首先,确保我们使用conda 管理的环境里是ray 1.3.0 亦或是最新dev版本。

接着,执行如下脚本注册几个actor:

import ray

ray.util.connect("127.0.0.1:10001", namespace="default")


@ray.remote
class UDFMaster(object):
    def __init__(self, num, conf):
        # model_servers = RayContext.parse_servers(conf["modelServers"])
        # items = RayContext.collect_from(model_servers)
        # model_refs = [ray.put(item) for item in items]
        self._idle_actors = [UDFWorker.remote(None, conf) for _ in range(num)]

    def get(self):
        return self._idle_actors.pop()

    def give_back(self, v):
        self._idle_actors.append(v)

    def shutdown(self):
        [ray.kill(worker) for worker in self._idle_actors]
        return ""


@ray.remote
class UDFWorker(object):
    def __init__(self, model_refs, conf):
        print("worker init")
        # from tensorflow.keras import models
        # self.model_path = "./tmp/model/{}".format(str(uuid.uuid4()))
        # streaming_tar.save_rows_as_file((ray.get(ref) for ref in model_refs), self.model_path)
        # models.load_model(self.model_path)

    def apply(self, v):
        return {"value": [[1.0, 2.0]]}

    def shutdown(self):
        pass


try:
    udfMaster = ray.get_actor("model_predict")
    ray.get(udfMaster.shutdown.remote())
    ray.get(ray.kill(udfMaster))
except Exception as inst:
    pass

UDFMaster.options(name="model_predict", lifetime="detached").remote(3, {})

这里我们启动一个actor pool ,三个 actor worker.

接着,下载pyjava项目:

git clone https://github.com/allwefantasy/pyjava.git

进入到该项目的python目录,运行./install.sh 安装 pyjava。

进入该项目,使用IDE导入。找到tech.mlsql.test.JavaApp1Spec, 然后修改

def condaEnv = "source /Users/allwefantasy/opt/anaconda3/bin/activate ray-dev"

为你自己的conda环境,也就是包含ray 1.3.0或者ray dev版本,同时包含pyjava的那个项目。

现在运行该示例,此时错误如下:

The actor died unexpectedly before finishing this task.
(b'\x80\x05\x95"\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x0eAsse'
 b'rtionError\x94\x93\x94)R\x94.')
AssertionError()


Traceback (most recent call last):
  File "/Users/allwefantasy/opt/anaconda3/envs/ray-dev/lib/python3.6/site-packages/pyjava/worker.py", line 155, in main
    process()
  File "/Users/allwefantasy/opt/anaconda3/envs/ray-dev/lib/python3.6/site-packages/pyjava/worker.py", line 132, in process
    exec(code, n_local, n_local)
  File "<string>", line 15, in <module>
  File "/Users/allwefantasy/projects/ray/python/ray/util/client/common.py", line 266, in remote
    return return_refs(ray.call_remote(self, *args, **kwargs))
  File "/Users/allwefantasy/projects/ray/python/ray/util/client/api.py", line 103, in call_remote
    return self.worker.call_remote(instance, *args, **kwargs)
  File "/Users/allwefantasy/projects/ray/python/ray/util/client/worker.py", line 328, in call_remote
    return self._call_schedule_for_task(task)
  File "/Users/allwefantasy/projects/ray/python/ray/util/client/worker.py", line 345, in _call_schedule_for_task
    raise cloudpickle.loads(ticket.error)
AssertionError

问题主要出在这一行代码:

udfMaster.give_back.remote(worker)

如果上面的示例代码拿出来,放到python文件里执行,是不会有问题的。

results matching ""

    No results matching ""