引言

今天,我们将学习如何使用 Dask Helm Chart 在 Kubernetes 集群上部署 Dask,然后使用注释运行和扩展不同类型的 Worker。

Dask Helm Chart 是什么?

Dask Helm Chart 是使用 Helm(一个 Kubernetes 应用程序包管理器)部署 Dask 的一种便捷方式。使用 Dask Helm Chart 部署 Dask 后,我们可以连接到我们的 HelmCluster 并开始扩展 Worker。

Dask Kubernetes 是什么?

Dask Kubernetes 允许您在 Kubernetes 集群上部署和管理您的 Dask 部署。Dask Kubernetes Python 包有一个 HelmCluster 类(以及其他功能),使您能够从 Python 管理集群。在本教程中,我们将使用 HelmCluster 作为我们的集群管理器。

先决条件

  • 已安装 Helm 并能够运行 helm 命令
  • 已有一个正在运行的 Kubernetes 集群。无论您是使用 MiniKubeKind 在本地运行 Kubernetes,还是使用 AWS 或 GCP 等云提供商,都无关紧要。但您的集群需要能够访问 GPU 节点 才能运行 GPU Worker。您还需要安装 RAPIDS 来运行 GPU Worker 示例。
  • 已安装 kubectl。尽管这不是必需的。

就这些了,让我们开始吧!

安装 Dask Kubernetes

来自文档

pip install dask-kubernetes --upgrade

conda install dask-kubernetes -c conda-forge

安装 Dask Helm Chart

首先,使用 Helm 在 Kubernetes 上部署 Dask

helm repo add dask https://helm.dask.org/
helm repo update
helm install my-dask dask/dask

现在您的 Kubernetes 集群上应该正在运行 Dask。如果您安装了 kubectl,您可以运行 kubectl get all -n default

Default Dask Cluster Installed with Helm

您可以看到我们创建了一些资源!主要要知道的是,我们一开始有三个 Dask Worker。

向我们的 Dask 部署添加 GPU Worker 组

Helm Chart 具有开箱即用的默认值,用于在 Kubernetes 上部署我们的 Dask 集群。但是现在,因为我们想创建一些 GPU Worker,我们需要更改 Dask Helm Chart 中的默认值。为了做到这一点,我们可以复制当前的 values.yaml 文件,更新它以添加一个 GPU Worker 组,然后更新我们的 Helm 部署。

  • 首先,您可以复制 Dask Helm Chart 中 values.yaml 文件的内容,并创建一个名为 my-values.yaml 的新文件。
  • 接下来,我们将更新文件中名为 additional_worker_groups 的部分。该部分如下所示
additional_worker_groups: [] # Additional groups of workers to create
# - name: high-mem-workers  # Dask worker group name.
#   resources:
#     limits:
#       memory: 32G
#     requests:
#       memory: 32G
# ...
# (Defaults will be taken from the primary worker configuration)
  • 现在,我们将编辑该部分,使其看起来像这样
additional_worker_groups: # Additional groups of workers to create
  - name: gpu-workers # Dask worker group name.
    replicas: 1
    image:
      repository: rapidsai/rapidsai-core
      tag: 21.12-cuda11.5-runtime-ubuntu20.04-py3.8
      dask_worker: dask-cuda-worker
    extraArgs:
      - --resources
      - "GPU=1"
    resources:
      limits:
        nvidia.com/gpu: 1
  • 现在,我们可以使用我们在 my-values.yaml 中的新值来更新我们的部署。
helm upgrade -f my-values.yaml my-dask dask/dask
  • 再次,您可以运行 kubectl get all -n default,您将看到我们新的 GPU Worker Pod 正在运行。

Dask Cluster Installed with Helm with a GPU worker

  • 现在,我们可以打开一个 Jupyter Notebook 或任何编辑器来编写一些代码。

扩展/收缩 Worker

我们首先从 Dask Kubernetes 导入 HelmCluster 集群管理器。接下来,通过传递 Dask 集群的 release_name 作为参数,我们将集群管理器连接到我们的 Dask 集群。就这样,HelmCluster 会自动将调度器端口转发给我们,并使我们能够快速访问日志。接下来,我们将扩展我们的 Dask 集群。

from dask_kubernetes import HelmCluster
cluster = HelmCluster(release_name="my-dask")
cluster

Dask Cluster with four workers

要扩展我们的集群,我们需要将所需的 Worker 数量作为参数提供给 HelmClusterscale 方法。默认情况下,scale 方法会扩展默认 Worker 组。在第一个示例中您可以看到,我们将默认 Worker 组从三个 Worker 扩展到五个 Worker,总共得到六个 Worker。在第二个示例中,我们使用了方便的 worker_group 关键字参数,将我们的 GPU Worker 组从一个 Worker 扩展到两个 Worker,总共得到七个 Worker。

cluster.scale(5)  # scale the default worker group from 3 to 5 workers
cluster

Dask Cluster with six workers

cluster.scale(2, worker_group = "gpu-workers")  # scale the GPU worker group from 1 to 2 workers
cluster

Dask Cluster with seven cluster

示例:查找 2020 年 4 月纽约市出租车平均出行距离

本示例将使用纽约出租车数据集查找 2020 年 4 月纽约市黄色出租车的平均出行距离。我们将以两种不同的方式计算此距离。第一种方式将使用我们的默认 Dask Worker,第二种方式将利用我们的 GPU Worker 组。我们将在两个示例中都将纽约出租车数据集加载为数据帧,并计算 trip_distance 列的 mean。主要区别在于,我们需要使用我们的 GPU Worker 组来运行 GPU 上的特定计算。我们可以通过使用 Dask 注释来实现这一点。

import dask.dataframe as dd
import dask

link = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-04.csv"
ddf = dd.read_csv(link, assume_missing=True)
avg_trip_distance = ddf['trip_distance'].mean().compute()
print(f"In January 2021, the average trip distance for yellow taxis was {avg_trip_distance} miles.")

with dask.annotate(resources={'GPU': 1}):
    import dask_cudf, cudf
    dask_cdf = ddf.map_partitions(cudf.from_pandas)
    avg_trip_distance = dask_cdf['trip_distance'].mean().compute()
    print(f"In January 2021, the average trip distance for yellow taxis was {avg_trip_distance} miles.")

总结

就是这样!我们使用 Helm 部署了 Dask,创建了一个额外的 GPU Worker 类型,并使用我们的 Worker 运行了一个使用纽约出租车数据集的示例计算。我们学到了几件新事物

  1. Dask Helm Chart 允许您创建具有不同 Worker 类型的多个 Worker 组。我们创建两种不同的 Dask Worker 组时看到了这一点:CPU Worker 和 GPU Worker。
  2. 您可以使用注释在您选择的 Worker 上运行特定的计算。我们的示例在我们的 GPU Worker 组上使用 RAPIDS 库 cudfdask_cudf 计算了平均出租车距离。
  3. Dask Kubernetes 中的 HelmCluster 集群管理器让您可以从 Python 快速扩展您的 Worker 组。我们通过在 HelmCluster 的 scale 方法中方便地将 Worker 组名称作为关键字参数传递,来扩展了我们的 GPU Worker 组。

未来工作

我们在 Dask 社区中对 Worker 组的概念进行了很多思考。到目前为止,大多数 Dask 部署都是同构的 Worker,但随着 Dask 用户更深入地使用 Dask,对具有专用 Worker 的异构集群的需求越来越大。因此,我们希望在整个 Dask 中添加 Worker 组。


博客评论由 Disqus 提供支持