Ray Actor 异常退出问题背景于复现流程
背景
# 获取一个worker pool
udfMaster = ray.get_actor("model_predict")
# 从worker pool 里获取一个worker,这个worker也是一个actor
worker = ray.get(udfMaster.get.remote())
# 把用完的worker归还到worker pool里
udfMaster.give_back.remote(worker)
最后一句在特殊环境中(pyjava)环境中会出现错误,导致worker actor异常退出。
复现流程
首先,确保我们使用conda 管理的环境里是ray 1.3.0 亦或是最新dev版本。
接着,执行如下脚本注册几个actor:
import ray
ray.util.connect("127.0.0.1:10001", namespace="default")
@ray.remote
class UDFMaster(object):
def __init__(self, num, conf):
# model_servers = RayContext.parse_servers(conf["modelServers"])
# items = RayContext.collect_from(model_servers)
# model_refs = [ray.put(item) for item in items]
self._idle_actors = [UDFWorker.remote(None, conf) for _ in range(num)]
def get(self):
return self._idle_actors.pop()
def give_back(self, v):
self._idle_actors.append(v)
def shutdown(self):
[ray.kill(worker) for worker in self._idle_actors]
return ""
@ray.remote
class UDFWorker(object):
def __init__(self, model_refs, conf):
print("worker init")
# from tensorflow.keras import models
# self.model_path = "./tmp/model/{}".format(str(uuid.uuid4()))
# streaming_tar.save_rows_as_file((ray.get(ref) for ref in model_refs), self.model_path)
# models.load_model(self.model_path)
def apply(self, v):
return {"value": [[1.0, 2.0]]}
def shutdown(self):
pass
try:
udfMaster = ray.get_actor("model_predict")
ray.get(udfMaster.shutdown.remote())
ray.get(ray.kill(udfMaster))
except Exception as inst:
pass
UDFMaster.options(name="model_predict", lifetime="detached").remote(3, {})
这里我们启动一个actor pool ,三个 actor worker.
接着,下载pyjava项目:
git clone https://github.com/allwefantasy/pyjava.git
进入到该项目的python目录,运行./install.sh
安装 pyjava。
进入该项目,使用IDE导入。找到tech.mlsql.test.JavaApp1Spec
, 然后修改
def condaEnv = "source /Users/allwefantasy/opt/anaconda3/bin/activate ray-dev"
为你自己的conda环境,也就是包含ray 1.3.0或者ray dev版本,同时包含pyjava的那个项目。
现在运行该示例,此时错误如下:
The actor died unexpectedly before finishing this task.
(b'\x80\x05\x95"\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x0eAsse'
b'rtionError\x94\x93\x94)R\x94.')
AssertionError()
Traceback (most recent call last):
File "/Users/allwefantasy/opt/anaconda3/envs/ray-dev/lib/python3.6/site-packages/pyjava/worker.py", line 155, in main
process()
File "/Users/allwefantasy/opt/anaconda3/envs/ray-dev/lib/python3.6/site-packages/pyjava/worker.py", line 132, in process
exec(code, n_local, n_local)
File "<string>", line 15, in <module>
File "/Users/allwefantasy/projects/ray/python/ray/util/client/common.py", line 266, in remote
return return_refs(ray.call_remote(self, *args, **kwargs))
File "/Users/allwefantasy/projects/ray/python/ray/util/client/api.py", line 103, in call_remote
return self.worker.call_remote(instance, *args, **kwargs)
File "/Users/allwefantasy/projects/ray/python/ray/util/client/worker.py", line 328, in call_remote
return self._call_schedule_for_task(task)
File "/Users/allwefantasy/projects/ray/python/ray/util/client/worker.py", line 345, in _call_schedule_for_task
raise cloudpickle.loads(ticket.error)
AssertionError
问题主要出在这一行代码:
udfMaster.give_back.remote(worker)
如果上面的示例代码拿出来,放到python文件里执行,是不会有问题的。