Dask 部署更新
摘要
在过去的六个月里,许多 Dask 开发者致力于使 Dask 更易于在各种情况下部署。本文总结了这些工作,并提供了相关工作的链接。
我们所说的部署是什么
为了在集群上运行 Dask,你需要在一台机器上设置一个调度器
$ dask-scheduler
Scheduler running at tcp://192.168.0.1
并在许多其他机器上启动 Dask worker
$ dask-worker tcp://192.168.0.1
Waiting to connect to: tcp://scheduler:8786
$ dask-worker tcp://192.168.0.1
Waiting to connect to: tcp://scheduler:8786
$ dask-worker tcp://192.168.0.1
Waiting to connect to: tcp://scheduler:8786
$ dask-worker tcp://192.168.0.1
Waiting to connect to: tcp://scheduler:8786
对于非正式集群,人们可能会手动完成此操作,通过登录到每台机器并自行运行这些命令。然而,更常见的是使用集群资源管理器,例如 Kubernetes、Yarn (Hadoop/Spark)、HPC 批处理调度器(SGE, PBS, SLURM, LSF 等)、某些云服务或某些定制系统。
随着 Dask 被越来越多的机构使用并在这些机构内部更广泛地使用,使部署流畅和自然变得越来越重要。事实上,这一点非常重要,以至于有几个不同的团队在某些方面或另一方面改进部署方面进行了七次独立努力。
我们将在下面简要总结并链接到这些工作,然后我们将通过讨论一些有助于使这项工作更加一致的内部设计来结束。
Dask-SSH
根据我们的用户调查,最常见的部署机制仍然是 SSH。Dask 在一段时间内就有一个 命令行 dask-ssh 工具,以便更轻松地使用 SSH 进行部署。我们最近对其进行了更新,使其还包括一个 Python 接口,提供了更多的控制。
>>> from dask.distributed import Client, SSHCluster
>>> cluster = SSHCluster(
... ["host1", "host2", "host3", "host4"],
... connect_options={"known_hosts": None},
... worker_options={"nthreads": 2},
... scheduler_options={"port": 0, "dashboard_address": ":8797"}
... )
>>> client = Client(cluster)
这不是我们推荐给大型机构的方式,但对于刚入门的非正式团体来说可能会有所帮助。
Dask-Jobqueue 和 Dask-Kubernetes 重写
我们重写了针对 HPC 中心常见 SLURM/PBS/LSF/SGE 集群管理器的 Dask-Jobqueue 和 Dask-Kubernetes。它们现在与 Dask SSH 共享一个通用代码库,因此更加一致,并有望减少 bug。
理想情况下,用户在使用现有工作负载时应该不会注意到太大差异,但异步操作、与 Dask JupyterLab 扩展集成等新功能更加一致可用。此外,我们还能够统一开发并显著减轻维护负担。
这些更改所在的 Dask Jobqueue 新版本是 0.7.0,工作在 dask/dask-jobqueue #307 中完成。Dask Kubernetes 的新版本是 0.10.0,工作在 dask/dask-kubernetes #162 中完成。
Dask-CloudProvider
对于云部署,我们通常建议使用托管的 Kubernetes 或 Yarn 服务,然后在其之上使用 Dask-Kubernetes 或 Dask-Yarn。
然而,一些机构已经决定或承诺使用某些特定供应商的技术,使用更适合特定云的原生 API 会更方便。新的软件包 Dask Cloudprovider 目前为 Amazon 的 ECS API 处理了这个问题,该 API 已经存在了很长时间,并且更普遍地被接受。
from dask_cloudprovider import ECSCluster
cluster = ECSCluster(cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<clustername>")
from dask_cloudprovider import FargateCluster
cluster = FargateCluster()
Dask-Gateway
在某些情况下,用户可能无法访问集群管理器。例如,机构可能不会授予所有数据科学用户访问 Yarn 或 Kubernetes 集群的权限。在这种情况下,Dask-Gateway 项目可能会有用。它可以启动和管理 Dask 作业,并在必要时提供到这些作业的代理连接。它通常以提升的权限部署,但由 IT 直接管理,从而为他们提供了更大的控制点。
GPU 和 Dask-CUDA
在使用 Dask 进行多 GPU 部署时,NVIDIA RAPIDS 需要能够指定越来越复杂的 Dask worker 设置。他们推荐以下部署策略:
- 一台机器上每个 GPU 一个 Dask worker
- 指定
CUDA_VISIBLE_DEVICES
环境变量,将该 worker 绑定到该 GPU - 如果你的机器有多个网络接口,则选择与该 GPU 连接最好的网络接口
- 如果你的机器有多个 CPU,则设置线程亲和性以使用最近的 CPU
- ... 等等
因此,我们希望在代码中指定这些配置,如下所示:
specification = {
"worker-0": {
"cls": dask.distributed.Nanny,
"options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "0,1,2,3"}, interface="ib0"},
},
"worker-1": {
"cls": dask.distributed.Nanny,
"options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "1,2,3,0"}, interface="ib0"},
},
"worker-2": {
"cls": dask.distributed.Nanny,
"options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "2,3,0,1"}, interface="ib1"},
},
"worker-2": {
"cls": dask.distributed.Nanny,
"options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "3,0,1,2"}, interface="ib1"},
},
}
以及用于部署这些 worker 的新 SpecCluster 类
cluster = SpecCluster(workers=specification)
我们在 Dask-CUDA 项目中使用了这种技术,为多 GPU 系统上的部署提供了便利的功能。
这个类足够通用,最终也成为了 SSH、Jobqueue 和 Kubernetes 解决方案的基础。
标准和约定
上述解决方案由在不同公司工作的不同团队构建。这很好,因为这些团队拥有在实际环境中处理集群管理器的实践经验,但在历史上,标准化用户体验一直有些挑战。当我们构建像 IPython widgets 或 Dask JupyterLab 扩展这样的其他工具时,这一点尤其具有挑战性,因为它们希望与所有 Dask 部署解决方案互操作。
最近对 Dask-SSH、Dask-Jobqueue、Dask-Kubernetes 以及新的 Dask-Cloudprovider 和 Dask-CUDA 库的重写,将它们全部置于相同的 dask.distributed.SpecCluster
超类下。因此,我们可以期望它们具有高度的一致性。此外,所有类现在都符合 dask.distributed.Cluster
接口,该接口标准化了适应性、IPython widgets、日志和一些基本报告等功能。
- Cluster
- SpecCluster
- Kubernetes
- JobQueue (PBS/SLURM/LSF/SGE/Torque/Condor/Moab/OAR)
- SSH
- CloudProvider (ECS)
- CUDA (LocalCUDACluster, DGX)
- LocalCluster
- Yarn
- Gateway
- SpecCluster
未来工作
还有很多工作要做。以下是我们在当前开发中看到的一些主题:
- 将调度器转移到网络中的独立作业/Pod/容器,这通常有助于复杂的网络情况
- 改善在这些情况下的仪表板代理
- 选择性地将集群的生命周期与请求集群的 Python 进程的生命周期分离
- 总结如何将 GPU 支持与所有集群管理器普遍结合的最佳实践
博客评论由 Disqus 驱动