以下代码创建并处理了 2 TB 的随机生成数据。

import dask.array as da

rs = da.random.RandomState()
x = rs.normal(10, 1, size=(500000, 500000), chunks=(10000, 10000))
(x + 1)[::2, ::2].sum().compute(scheduler='threads')

在单个 CPU 上,此计算需要两个小时。

在八个 GPU 的单节点系统上,此计算需要十九秒。

结合 Dask Array 与 CuPy

实际上,这个计算并没有那么令人印象深刻。它是一个简单的工作负载,大部分时间都花在创建和销毁随机数据上。计算和通信模式都很简单,反映了数据处理工作负载中常见的简单性。

真正令人印象深刻的是,我们能够通过组合这四个现有库,快速创建一个分布式并行 GPU 数组

  1. CuPy 提供了 NumPy 在 GPU 上的部分实现。

  2. Dask Array 在 NumPy 和 CuPy 等类似 NumPy 的库之上提供了分块算法。

    这使我们能够通过分块处理数据,操作比内存容纳量更多的数据。

  3. The Dask distributed 任务调度器并行运行这些算法,轻松协调多个 CPU 核心上的工作。

  4. The Dask CUDA 用于扩展 Dask distributed,增加 GPU 支持。

这些工具已经存在。我们只需要用少量胶水代码和微小修改将它们连接起来。通过将这些工具结合使用,我们可以快速构建和切换不同的架构,以探索最适合我们应用程序的选择。

对于这个例子,我们依赖了上游的以下更改

单/多 CPU/GPU 对比

我们现在可以轻松地在不同架构上运行一些实验。这之所以容易是因为……

  • 我们可以通过在 NumPy 和 CuPy 之间切换来在 CPU 和 GPU 之间切换。
  • 我们可以通过切换 Dask 的不同任务调度器,在单核/多核 CPU 和单 GPU/多 GPU 之间切换。

这些库使我们能够快速评估以下硬件选择的计算成本

  1. 单线程 CPU
  2. 40 核多线程 CPU (80 H/T)
  3. 单 GPU
  4. 单机多 GPU (8 个 GPU)

我们将在下面介绍这四种选择的代码,但首先,我们先展示一个结果表。

结果

架构 时间
单 CPU 核 2 小时 39 分钟
四十个 CPU 核 11 分钟 30 秒
一个 GPU 1 分钟 37 秒
八个 GPU 19 秒

设置

import cupy
import dask.array as da

# generate chunked dask arrays of mamy numpy random arrays
rs = da.random.RandomState()
x = rs.normal(10, 1, size=(500000, 500000), chunks=(10000, 10000))

print(x.nbytes / 1e9)  # 2 TB
# 2000.0

CPU 计时

(x + 1)[::2, ::2].sum().compute(scheduler='single-threaded')
(x + 1)[::2, ::2].sum().compute(scheduler='threads')

单 GPU 计时

我们通过将数据源更改为生成 CuPy 数组而不是 NumPy 数组来从 CPU 切换到 GPU。其他一切都应该或多或少地以相同的方式工作,无需对 CuPy 进行特殊处理。

(这实际上还不完全正确,dask.array 中的许多功能对于非 NumPy 数组会失效,但我们正在 Dask、NumPy 和 GPU 数组库内部积极解决这个问题。不过,本示例中的所有内容都能正常工作。)

# generate chunked dask arrays of mamy cupy random arrays
rs = da.random.RandomState(RandomState=cupy.random.RandomState)  # <-- we specify cupy here
x = rs.normal(10, 1, size=(500000, 500000), chunks=(10000, 10000))
(x + 1)[::2, ::2].sum().compute(scheduler='single-threaded')

多 GPU 计时

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

(x + 1)[::2, ::2].sum().compute()

再说一次,以下是结果

架构 时间
单 CPU 核 2 小时 39 分钟
四十个 CPU 核 11 分钟 30 秒
一个 GPU 1 分钟 37 秒
八个 GPU 19 秒

首先,这是我第一次使用 40 核系统。我很惊讶看到这么多核心。我也很高兴看到 Dask 的常规线程调度器能够愉快地让许多核心达到饱和。

虽然后来它确实下降到大约 5000-6000% 的使用率,如果你计算一下,你会发现我们并没有获得 40 倍的加速。我猜测,如果我们尝试混合使用线程和进程,比如使用十个进程,每个进程八个线程,性能会有所提高。

然而,从最大的多核 CPU 到单个 GPU 的飞跃仍然是一个数量级。跳到多 GPU 又是一个数量级,将计算时间缩短到 19 秒,这足够短,我愿意等它完成再离开我的电脑。

实际上,在仪表盘上观看非常有趣(特别是在等待了三个小时顺序解法运行之后)

结论

这个计算很简单,但我们刚刚探索的架构范围很广。我们从 CPU 切换到 GPU(后者拥有完全不同的代码库),尝试了多核 CPU 并行以及多 GPU 多核并行。

我们在不到二十行代码内完成了这项工作,使这个实验成为本科生或其他新手在家就能进行的任务。我们正接近这样一个节点,即多 GPU 系统的实验对非专业人士(至少在数组计算方面)来说变得平易近人。

这里是上面实验的 Notebook

改进空间

我们可以在多个方向上努力扩展上述计算。要使其可靠,我们还有大量工作要做。

  1. 使用更复杂的数组计算工作负载

    Dask Array 算法最初是围绕 NumPy 设计的。我们最近才开始使其更通用,以支持其他类型的数组(如 GPU 数组、稀疏数组等)。因此,在探索这些非 NumPy 工作负载时仍然存在许多错误。

    例如,如果你在上面的计算中将 sum 替换为 mean,你会得到一个错误,因为我们的 mean 计算中含有一个容易修复的错误,它完全假定输入是 NumPy 数组。

  2. 使用 Pandas 和 cuDF 代替 NumPy 和 CuPy

    cuDF 库旨在在 GPU 上重新实现 Pandas API,就像 CuPy 重新实现 NumPy API 一样。将 Dask DataFrame 与 cuDF 结合使用需要在双方进行一些工作,但这是完全可行的。

    我相信这里有很多唾手可得的成果。

  3. 改进和迁移 LocalCUDACluster

    上面使用的 LocalCUDAClutster 类是一种实验性的 Cluster 类型,它会在本地创建与你拥有 GPU 数量相同的 worker,并将每个 worker 分配给不同的 GPU。这使得人们可以轻松地在单节点系统上平衡 GPU 负载,而无需考虑太多细节。这似乎是当前生态系统中一个常见的痛点。

    然而,LocalCUDACluster 可能不应该放在 dask/distributed 仓库中(它似乎太 CUDA 特定了),所以可能会移到某个 dask-cuda 仓库。此外,关于如何在 GPU 之上处理并发、在 CPU 核心和 GPU 核心之间平衡等问题,仍然存在许多疑问。

  4. 多节点计算

    没有理由说我们不能通过使用多个多 GPU 节点来进一步加速这类计算。目前可以通过手动设置实现,但我们也应该改进现有的部署解决方案 dask-kubernetesdask-yarndask-jobqueue,使其更容易供希望使用多 GPU 资源集群的非专业人士使用。

  5. 成本

    我运行这个程序的机器很贵。好吧,它的拥有和运营成本远不如传统集群达到这类结果所需的那么高,但它仍然远远超出了爱好者或学生的承受能力。

    在更经济的系统上运行它会很有帮助,以便了解在价格更合理的系统上的权衡。我也应该更多地了解如何在云上配置 GPU。

快来帮忙!

如果你觉得上面的工作很有趣,那就来帮忙吧!有很多唾手可得且影响重大的工作要做。

如果你有兴趣在这些主题上获得报酬,那么考虑申请一份工作吧。NVIDIA 公司正在招聘与 Dask 和 GPU 使用相关的职位。

这是一个相当通用的招聘启事。如果你感兴趣但觉得不太符合要求,请仍然申请,我们会做些调整。


博客评论由 Disqus 提供支持