Dask 允许你构建计算图,然后为你并行执行。这对于充分利用计算机硬件非常有用。当你想要突破单机限制时,它也同样非常有用。

在本文中,我们将介绍

手动设置

让我们通过介绍设置分布式 Dask 集群最直接的方法来深入了解。

设置调度器和工作进程

假设我们有三台计算机,我们将它们称为 MachineAMachineBMachineC。每台机器都有一个可用的 Python 环境,并且我们已经使用 conda install dask 安装了 Dask。如果想将它们组合成一个 Dask 集群,我们首先在 MachineA 上运行一个调度器。

$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-btqf8ve1
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tcp://MachineA:8786
distributed.scheduler - INFO -   dashboard at:               :8787

接下来,我们需要在 MachineBMachineC 上启动一个工作进程。

$ dask-worker tcp://MachineA:8786
distributed.nanny - INFO -         Start Nanny at:    'tcp://127.0.0.1:51224'
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:51225
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:51225
distributed.worker - INFO -          dashboard at:            127.0.0.1:51226
distributed.worker - INFO - Waiting to connect to:        tcp://MachineA:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    8.00 GB
distributed.worker - INFO -       Local Directory:       /tmp/worker-h3wfwg7j
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:        tcp://MachineA:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

如果在我们的两台空闲机器上都启动一个工作进程,Dask 将自动检测机器上的资源并将其提供给调度器。在上面的示例中,工作进程检测到 4 个 CPU 核和 8GB RAM。因此,我们的调度器总共可以访问 8 个核和 16GB RAM,并将利用这些资源尽快运行完计算图。如果我们在更多机器上添加更多工作进程,则调度器可用的资源量会增加,计算时间应该会更快。

注意:虽然调度器机器可能拥有与另外两台机器相同的资源,但这些资源不会用于计算。

最后,我们需要从我们的 Python 会话连接到调度器。

from dask.distributed import Client
client = Client("tcp://MachineA:8786")

在 Python 全局命名空间中创建此 Client 对象意味着你执行的任何 Dask 代码都将检测到它,并将计算任务交给调度器,然后调度器将在工作进程上执行计算。

访问仪表板

Dask 分布式调度器还有一个仪表板,可以在网页浏览器中打开。正如你在上面的输出中看到的,默认位置是在调度器机器上的端口 8787。所以你应该能够导航到 http://MachineA:8787

Dask dashboard

如果你使用 Jupyter Lab 作为 Python 环境,你还可以使用 Dask Lab Extension 将仪表板中的单个图表作为窗口在 Jupyter Lab 中打开。

Dask Lab Extension

回顾

在这个最小示例中,我们在一些机器上安装了 Dask,在一台机器上运行了分布式调度器,并在其他机器上运行了工作进程。然后我们从 Python 会话连接到集群,并打开仪表板来监视集群。

我们尚未涵盖的是这些机器最初是从哪里来的。在本文的其余部分,我们将讨论人们通常在实际环境中运行集群的不同方式,并概述现有的各种工具有助于你在各种基础设施上设置 Dask 集群。

集群要求

要运行 Dask 集群,你必须能够在机器上安装 Dask 并启动调度器和工作进程组件。这些机器需要能够通过网络通信,以便这些组件可以相互通信。

你还需要能够通过网络从 Python 会话访问调度器,以便连接 Client 并访问仪表板。

最后,你在创建 Client 的 Python 会话中的 Python 环境必须与工作进程运行的 Python 环境匹配。这是因为 Dask 使用 cloudpickle 来序列化对象并将其发送给工作进程,以及检索结果。因此,这两个位置的包版本必须匹配。

在讨论人们通常希望在其上运行 Dask 的不同平台时,我们需要记住这些要求。

集群类型

我通常看到人们运行两种“类型”的集群:固定集群和临时集群。

固定集群

一种常见的集群设置方法是按照上述说明运行调度器和工作进程命令,但让它们无限期地运行。为了本文的目的,我将此称为“固定集群”。你可以使用诸如 systemdsupervisord 之类的工具来管理这些进程,并确保它们始终在机器上运行。然后可以将 Dask 集群视为一项服务。

在这种模式下,一旦集群设置好,人们就可以启动他们的 Python 会话,将客户端连接到这个现有集群,完成一些工作然后再次断开连接。他们之后可能会回到该集群并运行更多工作。与此同时,集群将处于空闲状态。

在这种模式下,多个用户共享一个集群也很常见,但不建议这样做,因为 Dask 调度器不单独管理用户或客户端,工作将按照先来先服务的原则执行。因此,我们建议用户一次使用一个集群。

临时集群

临时集群是指仅在工作期间存在的集群。在这种情况下,用户可以 SSH 到机器上,运行命令设置集群,连接客户端并执行工作,然后断开连接并退出 Dask 进程。一个基本的方法是创建一个 bash 脚本,调用 ssh 并设置集群。你将在执行工作时在后台运行此脚本,并在完成后将其终止。我们将在接下来的部分中介绍此方法的其他实现。

临时集群允许你利用大量机器,并在工作完成后再次释放它们。当你使用云服务或批处理调度器等系统时,这尤其有用,因为你的信用额度有限,或者需要为你配置的资源付费。

自适应性

临时集群通常也更容易扩展,因为你很可能有自动启动工作进程的机制。Dask 调度器维护一个预计未完成工作需要多长时间才能完成的估计值。如果调度器有启动和停止工作进程的机制,那么它将扩展工作进程,以便在 5 秒内完成所有未完成的工作。这称为自适应模式。

启动和停止工作进程的机制是通过插件添加的。我们将要讨论的许多实现都包含此逻辑。

连接性

Dask 默认使用 TCP 在客户端、调度器和工作进程之间进行通信。这意味着所有这些组件必须位于一个具有开放路由的 TCP/IP 网络上,以便机器之间能够相互通信。许多连接问题源于防火墙或私有网络阻塞了某些组件之间的连接。一个例子是在 AWS 等云平台上运行 Dask,但在咖啡馆使用免费 wifi 时在笔记本电脑上运行 Python 会话和客户端。你必须确保能够路由组件之间的流量,要么将 Dask 集群暴露到互联网,要么通过 VPN 或隧道将你的笔记本电脑连接到私有网络。

Dask 还正在进行添加对 UCX 的支持的工作,这将使其能够在有 InfiniBand 或 NVLink 网络的地方使用它们。

集群管理器

在接下来的部分中,我们将介绍 Dask 社区中可用的各种集群管理器实现。

在 Dask 分布式代码库中,有一个 Cluster 超类,可以被继承用于构建各种平台的集群管理器。社区成员已经利用这个超类构建了自己的包,可以在特定平台(例如 Kubernetes)上创建 Dask 集群。

这些类的设计理念是你在 Python 会话中导入集群管理器并实例化它。然后该对象负责在目标平台上启动调度器和工作进程。之后,你可以像往常一样从该集群对象创建一个 Client 对象来连接到它。

所有这些集群管理器对象都是临时集群,它们只存在于 Python 会话期间,然后将被清理。

本地集群

让我们从 Dask 分布式代码库中 Cluster 的一个参考实现 LocalCluster 开始。

from dask.distributed import LocalCluster, Client

cluster = LocalCluster()
client = Client(cluster)

此集群管理器在你的本地机器上启动一个调度器,然后为机器上找到的每个 CPU 核启动一个工作进程。

SSH 集群

另一个参考实现是 SSHCluster。这是使用 Dask 分布式与多台机器配合的最纯粹、最简单的方式之一,与本文开头的示例非常相似。

from dask.distributed import SSHCluster, Client

cluster = SSHCluster(["MachineA", "MachineB", "MachineC"])
client = Client(cluster)

这里的第一个参数是一个机器列表,我们可以 SSH 进去并在其上设置 Dask 集群。列表中的第一台机器将用作调度器,其余机器用作工作进程。

由于调度器使用的资源可能比工作进程少得多,你甚至可以将其在本地运行,并利用所有三台远程机器作为工作进程。

cluster = SSHCluster(["localhost", "MachineA", "MachineB", "MachineC"])

SpecCluster

Dask 分布式核心库中包含的最后一个实现是 SpecCluster。这实际上是另一个超类,旨在供其他开发者在构建集群管理器时继承。但是,它比 Cluster 更进一步,要求开发者以 Python 类的形式提供调度器和工作进程的完整规范。还有一个名为 ProcessInterface 的超类,旨在用于创建这些调度器和工作进程类。

拥有标准接口意味着用户体验更一致。我们接下来介绍的许多集群管理器都使用 SpecCluster

Dask Kubernetes

Dask KubernetesKubernetes 提供了一个名为 KubeCluster 的集群管理器。

Kubernetes 提供高级 API 和抽象概念,用于在机器集群上调度 Linux 容器。它提供了进程、容器、网络、存储等的抽象概念,以便更好地利用数据中心规模的资源。

作为 Dask 用户,通常集群如何设置对你来说并不重要。但如果你的组织或机构为你提供了 Kubernetes 集群的访问权限,你需要理解这些概念才能在其上调度工作。

KubeCluster 集群管理器进一步将这些概念抽象为我们熟悉的 Dask 术语。

from dask.distributed import Client
from dask_kubernetes import KubeCluster

cluster = KubeCluster(**cluster_specific_kwargs)
client = Client(cluster)

为了使这段代码工作,你需要配置你的 Kubernetes 凭据,就像在 SSH 示例中需要配置密钥一样。

你的客户端还需要能够访问 Dask 调度器,并且你可能希望能够在浏览器中打开仪表板。然而,Kubernetes 使用覆盖网络,这意味着分配给调度器和工作进程的 IP 地址只能在集群内部路由。这对于它们之间相互通信没问题,但意味着你无法从外部访问。

解决这个问题的一种方法是确保你的 Python 会话也在 Kubernetes 集群内部运行。在 Kubernetes 上设置交互式 Python 环境的一种流行方法是使用 Zero to Jupyter Hub,它让你能够访问在 Kubernetes 集群内部运行的 Jupyter notebook。

另一种方法是将你的调度器暴露到外部网络。你可以通过暴露与调度器关联的 Kubernetes Service 对象来实现,或者为你的 Kubernetes 集群设置和配置 Ingress 组件。这两个选项都需要一些 Kubernetes 知识。

Dask Helm Chart

在 Kubernetes 集群上运行 Dask 的另一个选项是使用 Dask Helm Chart

这是一个固定集群设置的示例。Helm 是一种在 Kubernetes 集群上安装特定资源的方式,类似于 aptyum 等包管理器。Dask Helm Chart 包括一个 Jupyter notebook、一个 Dask 调度器和三个 Dask 工作进程。工作进程可以通过与 Kubernetes API 交互手动伸缩,但不能由 Dask 调度器本身自适应伸缩。

这感觉与我们目前所见的方法不同。它为你提供了一个始终可用的 Dask 集群,以及一个 Jupyter notebook 来驱动集群。然后你必须将你的工作带到集群的 Jupyter 会话中,而不是从现有的工作环境生成一个集群。

这种方法的一个好处是,由于 Jupyter notebook 是作为集群的一部分设置的,它已经安装了 Lab Extension,并且已经预配置了 Dask 集群的位置。因此,与之前需要为 Client 提供调度器地址或 Cluster 对象不同,在此情况下,它将从 Helm chart 设置的环境变量中自动检测集群。

from dask.distributed import Client

client = Client()  # The address is loaded from an environment variable

注意:如果在调度器位置未配置的其他情况下调用 Client 时不带任何参数,它将自动创建一个 LocalCluster 对象并使用它。

Dask Jobqueue

Dask Jobqueue 是一套面向 HPC 用户的集群管理器。

当作为研究人员或学者,可以使用 HPC 或超级计算机时,你很可能需要通过某种作业排队系统向该机器提交工作。这通常以 bash 脚本的形式出现,其中包含关于你在机器上需要多少资源以及要运行的命令的元数据。

Dask Jobqueue 提供了用于 PBSSlurmSGE 的集群管理器对象。创建这些集群管理器时,它们将根据你的参数为批处理调度器构建脚本,并使用你的默认凭据提交它们。

from dask.distributed import Client
from dask_jobqueue import PBSCluster

cluster = PBSCluster(**cluster_specific_kwargs)
client = Client(cluster)

由于像这样的批处理系统通常等待时间较长,你可能无法立即访问集群对象,并且伸缩会很慢。根据排队策略,最好将其视为固定大小的集群。但是,如果你有一个响应迅速的交互式队列,那么你可以像使用任何其他自动伸缩集群管理器一样使用它。

同样,期望你的 Python 会话能够连接到调度器的 IP 地址。如何确保这一点可能因你的 HPC 中心设置而异。

Dask Yarn

Dask Yarn 是传统 Hadoop 系统的集群管理器。

Hadoop 是一个允许使用简单编程模型在计算机集群上分布式处理大数据集的框架。它是 Java/Scala 生态系统中用于处理大量数据的常见基础设施。但是,你也可以使用名为 YARN 的调度功能来调度 Dask 工作进程并利用底层硬件资源。

from dask.distributed import Client
from dask_yarn import YarnCluster

cluster = YarnCluster(**cluster_specific_kwargs)
client = Client(cluster)

Dask Yarn 仅适用于从 Hadoop 边缘节点使用,该节点可以访问 Hadoop 集群的内部网络。

Dask Cloudprovider

Dask Cloudprovider 是一组利用云原生 API 的集群管理器。

云提供商,如 AmazonMicrosoftGoogle,提供了许多 API,用于构建和运行各种类型的基础设施。这些基础设施涵盖了运行 Linux 或 Windows 的传统虚拟服务器,到可以按需执行小段代码的更高级 API。它们提供批处理系统、Hadoop 系统、机器学习系统等等。

在云提供商上运行 Dask 的理想场景是一个服务,它允许你运行指定 Python 环境的调度器和工作进程,然后从外部安全地连接到它们。这样的服务尚不存在,但程度不同的类似服务确实存在。

一个例子是 AWS Fargate,它是一个托管容器平台。你可以按需运行 Docker 容器,每个容器都有一个唯一的 IP 地址,可以是公共的或私有的。这意味着我们可以在 Dask 容器中运行 Dask 调度器和工作进程,并从我们的 Python 会话连接到它们。这项服务按请求资源的秒数计费,因此作为临时服务最有意义,在你未使用时没有成本。

from dask.distributed import Client
from dask_cloudprovider import FargateCluster

cluster = FargateCluster(**cluster_specific_kwargs)
client = Client(cluster)

此集群管理器使用你的AWS 凭据在 Fargate 上进行身份验证并请求 AWS 资源,然后将你的本地会话连接到云上运行的 Dask 集群。

甚至还有更高级的服务,例如 AWS LambdaGoogle Cloud Functions,它们允许你按需执行代码并按代码执行时间计费。这些服务被称为“无服务器”服务,因为服务器被完全抽象化了。这对我们的 Dask 集群来说是完美的,因为你可以将调度器和工作进程作为代码提交运行。然而,当运行这些云函数时,它们之间无法建立网络连接,因为它们没有可路由的 IP 地址,因此无法由这些执行函数构建 Dask 集群。也许将来有一天可以实现!

Dask Gateway

Dask Gateway 是一个用于管理 Dask 集群的中央服务。它提供了一个安全的 API,多个用户可以与其通信以请求 Dask 服务器。它可以在 Kubernetes、Yarn 或批处理系统上生成 Dask 集群。

此工具面向希望让其用户创建 Dask 集群,但希望保持一些集中控制,而不是每个用户自行创建的 IT 管理员。这对于跟踪 Dask 使用情况和设置每用户限制也很有用。

from dask.distributed import Client
from dask_gateway import GatewayCluster

cluster = GatewayCluster(**cluster_specific_kwargs)
client = Client(cluster)

对于每个用户,创建和使用网关集群的命令是相同的。设置和管理网关服务器以及配置通过 Kerberos 或 Jupyter Hub 进行身份验证取决于管理员。他们还应该向用户提供配置,以便 Dask Gateway 知道如何连接到网关服务器。在大型组织或机构中,IT 部门也可能负责提供员工使用的机器,因此应该能够将配置文件放置到用户的计算机上。

本地 CUDA 集群

我要介绍的最后一个集群管理器是 Dask CUDA 包中的 LocalCUDACluster

这与其他集群管理器略有不同,因为它构建的 Dask 集群专门针对单个硬件进行优化。在这种情况下,它针对具有 GPU 的机器,范围从带有板载 NVIDIA GPU 的笔记本电脑,到在你的数据中心运行带有多个 GPU 的 NVIDIA DGX-2

此集群管理器与 LocalCluster 非常相似,它在当前机器上本地创建资源,但不是为每个 CPU 核创建一个工作进程,而是为每个 GPU 创建一个。它还修改了一些默认配置,以确保 GPU 工作负载的良好性能。

from dask.distributed import Client
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster(**cluster_specific_kwargs)
client = Client(cluster)

此软件包还有一个名为 dask-cuda-worker 的替代 Dask 工作进程 bash 命令,该命令也修改了 Dask 工作进程的默认设置,以确保其针对 GPU 工作进行优化。

未来

现在我们已经阐述了 Dask 分布式集群生态系统的现状,接下来让我们讨论未来的发展方向。

如开头所示,Dask 集群是调度器、工作进程和客户端的组合,可实现 Python 函数的分布式执行。在自己的机器上设置自己的集群很简单,但由于基础设施的配置方式多种多样,我们现在有多种自动化此过程的方法。

这种多样性引发了关于如何改进的一些问题。

我们需要更多固定集群选项吗?

在介绍各种集群管理器时,我们只介绍了一种固定集群实现:Helm chart。是否有对更多固定集群的需求?例如,遵循与 Helm chart 相同结构的 CloudFormationTerraform 模板,提供 Jupyter 服务、Dask 调度器和固定数量的工作进程。

我们能否弥合一些差距?

Dask Kubernetes 集群管理器能否连接到使用 Helm chart 构建的现有集群,然后执行自适应伸缩?我经常被问到这个问题,但目前尚不清楚如何实现。集群管理器和 Helm chart 使用不同的 Kubernetes 资源来实现相同目标,因此在这成为可能之前需要进行一些统一。

临时集群是否过于临时?

许多集群管理器仅在 Python 会话期间存在。然而,有些(例如 YarnCluster)允许你从集群断开连接并重新连接。这使得你可以将 YARN 集群更像固定集群一样对待。

在其他情况下,Python 会话可能有超时或限制,并且可能在 Dask 集群完成工作之前被终止。让 Dask 集群继续存在是否有益?随着 Python 会话被清理,客户端和 Futures 也将被垃圾回收。所以也许没有。

我们能更好地管理 conda 环境吗?

目前,创建集群的人有责任确保工作进程的 conda 环境与将创建 Client 的环境匹配。在固定集群上,这可能更容易,因为 Python/Jupyter 环境可以在同一套系统内提供。然而,在临时集群上,你可能会连接到云或批处理系统,它们的环境可能与你的笔记本电脑环境不匹配,例如。

也许可以在工作进程和 conda 之间进行集成,以便动态创建环境。探讨这方面的性能影响会很有趣。

另一种选择是允许用户在工作进程上启动远程 Jupyter 内核。他们将无法访问相同的文件系统,但将共享一个 conda 环境。


博客评论由 Disqus 提供支持