这项工作得到了 Continuum Analytics摩尔基金会 (Moore Foundation) 数据驱动发现倡议的支持。

总结

我们衡量了 Dask 分布式调度器在不同工作负载下、随着问题规模和集群规模的增加而表现出的性能。这有助于解答关于 Dask 可伸缩性的问题,也有助于读者了解哪些类型的计算能够很好地扩展。

我们将通过几种方式改变我们的计算,以了解它们如何对性能产生影响。我们考虑以下方面:

  1. 计算和通信模式,例如完全并行、完全顺序、大块通信、许多小块通信、最近邻、树形归约和动态图。
  2. 改变任务持续时间,从非常快(微秒级)的任务,到 100 毫秒和 1 秒长的任务。任务越快,中央调度器就越难跟上工作进程的速度。
  3. 改变集群规模,从一个双核工作进程到 256 个双核工作进程,并改变数据集大小,数据集大小与工作进程数量呈线性关系。这意味着我们正在衡量弱伸缩性 (weak scaling)
  4. 改变 tasks、多维 arraysdataframes 之间的 API,它们都属于上述类别,但依赖于不同的内存计算系统,如 NumPy 或 Pandas。

我们将从纯任务的基准测试开始,任务是最灵活的系统,也最容易理解。这将帮助我们理解数组和数据帧的伸缩性限制。

注意:我们没有为这些实验调整基准测试或配置。它们的表现远低于可能达到的水平,但也许代表了新手用户在没有专业知识或未考虑配置的情况下设置集群时可能遇到的情况。

关于基准测试和偏差的说明

如果您时间紧迫,可以安全地跳过本节

这是一份技术文档,而非营销材料。这些基准测试遵循了这篇博文中阐述的原则,并试图避免开发者偏见带来的陷阱。特别是以下几点是真实的:

  1. 我们在集群上运行基准测试之前就确定了一组基准测试
  2. 我们在看到结果后没有改进软件或调整基准测试。这些测试是在几周前发布的当前稳定版 Dask 上运行的,而不是在开发分支上。
  3. 计算是按照新手会编写的方式朴素地构建的。它们没有为了额外性能进行调整。
  4. 集群的配置也很朴素,没有考虑规模或特殊参数。

我们估计,专家使用将比我们看到的结果带来大约 5-10 倍的伸缩性改进。我们将在文章底部详细介绍如何使用专家方法改进伸缩性。

尽管如此,这篇博文的作者是受雇编写这款软件的,所以您可能不应该完全相信他。我们邀请读者独立探索。所有配置、notebook、绘图代码和数据都可在下方找到:


任务 (Tasks)

我们首先对任务调度 API 进行基准测试。Dask 的任务调度 API 是其他“大数据”API(如 dataframes)的核心。我们从任务开始,因为它们是 Dask 最简单、最原始的表示形式。我们主要会在整数上运行以下函数,但您可以在此处填充任何函数,例如 pandas dataframe 方法或 sklearn 例程。

import time

def inc(x):
    return x + 1

def add(x, y):
    return x + y

def slowinc(x, delay=0.1):
    time.sleep(delay)
    return x + 1

def slowadd(x, y, delay=0.1):
    time.sleep(delay)
    return x + y

def slowsum(L, delay=0.1):
    time.sleep(delay)
    return sum(L)

完全并行的任务 (Embarrassingly Parallel Tasks)

我们在集群上运行以下代码并测量它们完成所需的时间

futures = client.map(slowinc, range(4 * n), delay=1) # 1s delay
wait(futures)
futures = client.map(slowinc, range(100 * n_cores)) # 100ms delay
wait(futures)
futures = client.map(inc, range(n_cores * 200))     # fast
wait(futures)

我们看到,对于快速任务,系统每秒可以处理大约 2000-3000 个任务。这主要受限于调度器和客户端的开销。向系统中添加更多工作进程并不能提高每秒处理的任务数量。但是,如果我们的任务花费一定时间(例如 100 毫秒或 1 秒),那么我们会看到不错的加速效果。

如果您将绘图切换到线性刻度,您会看到当我们扩展到 512 个核心时,速度开始降低大约两倍。看到这种行为(基准测试万岁)我感到惊讶,因为 Dask 的所有调度决策都与集群规模无关。我的第一个猜测是调度器可能被管理消息淹没,但我们需要在这里更深入地研究一下。

树形归约 (Tree Reduction)

并非所有计算都是完全并行的。许多计算之间存在依赖关系。考虑树形归约,我们将相邻元素组合,直到只剩下一个。这考验了任务依赖和小数据移动。

from dask import delayed

L = range(2**7 * n)
while len(L) > 1:  # while there is more than one element left
    # add neighbors together
    L = [delayed(slowadd)(a, b) for a, b in zip(L[::2], L[1::2])]

L[0].compute()

我们看到了与完全并行情况类似的伸缩性。性能呈线性增长,直到达到每秒约 3000 个任务,此时开始落后于线性伸缩。Dask 似乎不介意依赖关系,即使是像这种情况下的自定义依赖。

最近邻 (Nearest Neighbor)

最近邻计算在数据分析中很常见,当您需要在相邻元素之间共享少量数据时,例如在数据框中的时间序列计算、数组中的重叠图像处理或 PDE 计算中经常发生的情况。

L = range(20 * n)
L = client.map(slowadd, L[:-1], L[1:])
L = client.map(slowadd, L[:-1], L[1:])
wait(L)

伸缩性与树形归约情况类似。有趣的依赖结构不会带来显著的开销或伸缩成本。

顺序 (Sequential)

我们考虑一个完全不并行,而是高度顺序的计算。增加工作进程数量在这里应该没有帮助(一次只能做一件事),但这确实展示了大量工作进程带来的额外压力。注意,我们为此关闭了任务融合,所以这里我们测量的是调度器和工作进程之间每秒能发生多少次往返。

x = 1

for i in range(100):
    x = delayed(inc)(x)

x.compute()

因此,我们每秒可以进行大约 100 次往返,或者往返延迟大约为 10 毫秒。事实证明,这部分成本相当大一部分是由于一项优化:工作进程更喜欢批量处理小消息以提高吞吐量。在这种情况下,这项优化对我们不利。尽管如此,我们这里的速度仍然比视频帧率快大约 2-4 倍(视频以大约 24 赫兹运行,或帧间间隔 40 毫秒)。

循环中的客户端 (Client in the loop)

最后,我们考虑一个归约操作,它消耗先完成的 future 并将它们相加。这是在计算中使用客户端逻辑的一个例子,这在复杂算法中通常很有帮助。这也稍微更好地伸缩,因为调度器中需要跟踪的依赖关系更少。客户端承担了一部分负载。

from dask.distributed import as_completed
futures = client.map(slowinc, range(n * 20))

pool = as_completed(futures)
batches = pool.batches()

while True:
    try:
        batch = next(batches)
        if len(batch) == 1:
            batch += next(batches)
    except StopIteration:
        break
    future = client.submit(slowsum, batch)
    pool.add(future)

任务:完成 (Tasks: Complete)

我们展示了上面大部分图表以供比较。

数组 (Arrays)

当我们将 NumPy 数组与上面的任务调度系统结合时,我们就得到了 dask.array,一个分布式多维数组。本节展示了与上一节类似的计算(map、归约、最近邻),但现在这些计算是由实际面向数据的计算驱动的,并涉及真实的数据移动。

创建数据集 (Create Dataset)

我们创建一个带有随机数据的方阵。这个数组的大小随核心数量而变化。我们将其切分成大小为 2000x2000 的均匀块。

N = int(5000 * math.sqrt(n_cores))
x = da.random.randint(0, 10000, size=(N, N), chunks=(2000, 2000))
x = x.persist()
wait(x)

创建这个数组是完全并行的。这里的图表有一个奇怪的角落,我无法解释。

元素级计算 (Elementwise Computation)

我们对这个数组进行一些元素级的数值计算。

y = da.sin(x) ** 2 + da.cos(x) ** 2
y = y.persist()
wait(y)

这也是完全并行的。这里的每个任务大约需要 300 毫秒(在单个 2000x2000 的 numpy 数组块上调用此操作所需的时间)。

归约 (Reductions)

我们对数组求和。这被实现为树形归约。

x.std().compute()

随机访问 (Random Access)

我们从数组中获取单个元素。这不应该随着工作进程数量的增加而变快,但可能会变慢,这取决于一个工作进程给调度器增加了多少基准负载。

x[1234, 4567].compute()

我们每秒可以获得大约 400-800 字节,这意味着响应时间为 10-20 毫秒,大约是视频帧率速度的两倍。我们看到,一旦有大约一百个活跃连接,性能确实会下降。

通信 (Communication)

我们将数组与其转置相加。这迫使不同的块在网络中移动,以便它们可以相互相加。大约一半的数组在网络中移动。

y = x + x.T
y = y.persist()
wait(y)

此计算的任务结构类似于最近邻。它具有规则的模式,每个任务连接数量很少。这实际上更多是对网络硬件的测试,我们看到它没有施加任何额外的伸缩限制(这看起来像正常的略低于线性的伸缩)。

重分块 (Rechunking)

有时通信由许多小传输组成。例如,如果您有一个图像时间序列,其中每个图像是一个块,您可能希望重新分块数据,以便每个像素的所有时间值都位于一个块中。这样做可能非常具有挑战性,因为每个输出块都需要来自每个输入块的一小部分数据,从而可能导致 n 平方次的传输。

y = x.rechunk((20000, 200)).persist()
wait(y)

y = y.rechunk((200, 20000)).persist()
wait(y)

这种计算可能非常困难。我们看到 dask 执行它的速度比归约等快速计算要慢,但它仍然可以很好地扩展到数百个工作进程。

最近邻 (Nearest Neighbor)

Dask.array 包含重叠相邻块小部分的能力,以支持需要一定连续性的函数,例如导数或空间平滑函数。

y = x.map_overlap(slowinc, depth=1, delay=0.1).persist()
wait(y)

数组:完成 (Array Complete)

数据帧 (DataFrames)

我们可以将 Pandas 数据帧与 Dask 结合,得到 Dask 数据帧,即分布式表格。本节将与上一节关于数组的内容非常相似,但将重点关注 pandas 风格的计算。

创建数据集 (Create Dataset)

我们创建一个包含随机整数的数组,有十列,每核心两百万行,但切分成大小为一百万的块。我们将此转换成一个整数数据帧。

x = da.random.randint(0, 10000, size=(N, 10), chunks=(1000000, 10))
df = dd.from_dask_array(x).persist()
wait(df)

元素级 (Elementwise)

我们可以执行 100 毫秒的任务或尝试一系列算术运算。

y = df.map_partitions(slowinc, meta=df).persist()
wait(y)
y = (df[0] + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10).persist()
wait(y)

随机访问 (Random access)

类似地,我们可以尝试使用 loc 进行随机访问。

df.loc[123456].compute()

归约 (Reductions)

我们可以尝试对整个数据集或单个序列进行归约。

df.std().compute()
df[0].std().compute()

Groupby 聚合,例如 df.groupby(...).column.mean(),操作方式与归约非常相似,只是稍微复杂一些。

df.groupby(0)[1].mean().compute()

洗牌/混洗 (Shuffles)

然而,像 df.groupby(...).apply(...) 这样的操作要困难得多,因为我们实际上需要构建组。这需要对所有数据进行完全洗牌 (shuffle),这可能非常昂贵。

这与我们排序或调用 set_index 时发生的操作相同。

df.groupby(0).apply(len).compute()  # this would be faster as df.groupby(0).size()
y = df.set_index(1).persist()
wait(y)

它的性能仍然不错,并且可以很好地扩展到大约一百个工作进程。

时间序列操作 (Timeseries operations)

时间序列操作通常需要最近邻计算。这里我们查看滚动聚合,但累积操作、重采样等都非常相似。

y = df.rolling(5).mean().persist()
wait(y)

数据帧:完成 (Dataframes: Complete)

分析 (Analysis)

让我们从几个主要观察开始:

  1. 单个任务花费的时间越长,Dask(或任何分布式系统)的伸缩性就越好。随着工作进程数量的增加,您也应该努力增加平均任务大小,例如通过增加数组块或数据帧分区的内存大小。
  2. Dask 调度器 + 客户端目前每秒最多处理约 3000 个任务。换句话说,如果我们的计算需要 100 毫秒,那么我们可以饱和大约 300 个核心,这与我们在这里观察到的情况差不多。
  3. 在适度的情况下,例如在归约或最近邻计算中,添加依赖关系通常是免费的。只要并行性仍然充足,您的依赖关系结构是什么样子并不重要。
  4. 添加更实质性的依赖关系,例如在数组重分块或数据帧洗牌中,可能会更昂贵,但 dask 集合算法(array, dataframe)的设计旨在即使在大规模下也能保持伸缩性。
  5. 调度器似乎在 256 个工作进程时变慢,即使是长任务

长度也是如此。这表明我们可能存在一个需要解决的开销问题。

专家方法 (Expert Approach)

因此,根据我们在这里的经验,现在让我们调整设置以使 Dask 运行良好。我们希望避免两件事:

  1. 大量独立的工人进程
  2. 大量小任务

所以让我们做一些改变:

  1. 更大的工作进程:与其使用 256 个双核工作进程,不如部署 32 个十六核工作进程。
  2. 更大的块:与其使用 2000x2000 的 numpy 数组块,不如将其增加到 10,000x10,000。

    与其使用 1,000,000 行的 Pandas 数据帧分区,不如将其增加到 10,000,000。

    这些大小仍然在舒适的内存限制内。在我们的例子中,每个大约是一个千兆字节。

当我们进行这些更改时,我们发现所有指标在更大规模下都得到了改善。下面表格中列出了一些显著的改进(抱歉在这种情况下没有漂亮的图表)。

基准测试小规模大规模单位
任务:完全并行 3500 3800 任务/秒
数组:元素级 sin(x)**2 + cos(x)**2 2400 6500 MB/秒
数据帧:元素级算术 9600 66000 MB/秒
数组:重分块 4700 4800 MB/秒
数据帧:设置索引 1400 1000 MB/秒

我们看到,对于某些操作,我们可以获得显著的改进(dask.dataframe 现在每秒处理数据达到 60),而对于主要受调度器或网络限制的其他操作,这并不能显著改善情况(有时甚至会恶化)。

尽管如此,即使在朴素的设置下,我们也能在适度的集群上轻松地每秒处理数十 GB 的数据。这些速度适用于非常广泛的计算。

总结思考

希望这些笔记能帮助人们理解 Dask 的伸缩性。像所有工具一样,它有其局限性,但即使在正常设置下,Dask 也应该能很好地扩展到大约一百个工作进程。一旦达到这个限制,您可能需要开始考虑其他因素,特别是每个工作进程的线程数和块大小,这两者都有助于将性能推向数千个核心的范围。

随附的 notebook 是独立的,其中包含运行和计时计算以及生成 Bokeh 图表的代码。我很想看到其他人使用不同的硬件或设置重现这些(或其他的!)基准测试。

工具 (Tooling)

这篇博文使用了以下工具:

  1. Dask-kubernetes:用于在 Google 计算引擎上部署不同规模的集群
  2. Bokeh:用于绘图(图库
  3. gcsfs:用于 Google 云存储上的存储

博客评论由 Disqus 提供支持