摘要

本文讨论了 Dask 在任务调度方面的开销成本,并提出了一个初步的加速计划。

本文写给其他维护者阅读,经常涉及内部细节。它不适合大众阅读。

这个问题是如何表现出来的?

当我们提交大型图时,从调用 .compute() 到工作真正在 worker 上开始之间会有一点延迟。在某些情况下,这种延迟会影响可用性和性能。

此外,在极少数情况下,任务之间的间隔也可能成为问题,特别是当这些任务非常短且由于某种原因无法延长时。

谁会关心?

首先,这个问题影响大约 1-5% 的 Dask 用户。这些人希望相对快速地处理数百万个任务。让我们列举一些用例:

  1. 10-100TB 规模的 Xarray/Pangeo 工作负载
  2. 在大型表格数据上的 NVIDIA RAPIDS 工作负载(GPU 使计算加速,因此其他成本相对变高)
  3. 一些对冲基金内部的一些神秘用例

它不影响处理 100GB 到几 TB 数据,且不介意等待 10 秒让事情开始运行的日常用户。

成本粗略分解

当你调用 x.sum().compute() 时,会发生几件事

  1. 图生成:Dask 集合库中的一些 Python 代码(例如 dask array)调用 sum 函数,这会在客户端生成一个任务图。
  2. 图优化:然后我们在客户端优化该图,以移除不必要的工作,融合任务,应用重要的高级优化等等。
  3. 图序列化:现在我们将该图打包成可以发送到调度器的形式。
  4. 图通信:我们将这些字节通过网络发送到调度器。
  5. Scheduler.update_graph:调度器接收这些字节,解包它们,然后更新其内部数据结构。
  6. 调度:调度器然后将就绪任务分配给 workers。
  7. 与 workers 通信:调度器向每个 worker 发送许多更小的消息,包含他们可以执行的任务。
  8. Workers 工作:Workers 然后执行这些工作,并开始与调度器来回通信以接收新指令。

通常,现在大多数人关注步骤 1-6。一旦事情传到 workers 并开始显示进度条,人们往往会稍微不那么关心了(但也不是完全不关心)。

其他项目如何处理?

让我们看看其他一些项目是如何做的,看看是否有我们可以学习的地方。这些是常见的建议,但大多数都存在挑战。

  1. 使用 C++/Rust/C/Cython 重写调度器

    提案:Python 慢。想让它更快?不要用 Python。参见学术项目。

    挑战:这对于上述管道中的某些部分有意义,但对其他部分则不然。这也使得吸引维护者变得更困难。

    我们应该考虑的是:调度器和优化算法的某些部分可以用较低级语言编写,例如 Cython。我们需要注意可维护性。

  2. 分布式调度

    提案:调度器慢,也许可以有很多调度器?参见 Ray。

    挑战:如果调度状态分布在多台计算机上,Dask 需要做出的决策类型实际上很难实现。当工作负载非常均匀或高度解耦时,分布式调度效果更好。分布式调度对喜欢解决有趣/难题的人非常有吸引力。

    我们应该考虑的是:我们可以将一些简单的逻辑下移到 workers。虽然我们已经对简单的事情这样做了。不清楚这里还有多少额外的好处。

  3. 围绕集合构建专门调度

    提案:如果 Dask 只成为一个 dataframe 库或只成为一个数组计算库,那么它可以更有效地特殊处理某些事情。参见 Spark、Mars 和其他项目。

    挑战:是的,但 Dask 不是一个 dataframe 库或一个数组库。我们上面提到的三个用例都非常不同。

    我们应该考虑的是:像 dask array 和 dask dataframe 这样的模块应该开发高级查询块,我们应该努力直接通过网络传输这些子图,以便它们更紧凑。

我们到底应该做什么?

因为我们的管道有许多阶段,每个阶段可能因不同的原因而慢,所以我们必须做很多事情。此外,这是一个难题,因为在这个层面改变项目的一个部分会对其他许多部分产生影响。本文的其余部分试图阐述一套一致的修改方案。我们先从摘要开始

  1. 对于 Dask array/dataframe,让我们更积极地使用高级图,以便我们可以在客户端和调度器之间只通信抽象表示。
  2. 但这会破坏低级图优化,特别是融合、剪枝和切片融合。我们可以通过两项修改使其不再必要。
    • 我们可以让高级图变得更智能,以处理剪枝和切片融合。
    • 我们可以将更多的调度工作下移到 workers,以在那里复制低级融合的优势。
  3. 然后,一旦所有的图操作都发生在调度器上,让我们尝试加速它,最好使用当前开发者社区能够理解的语言,比如 Cython。
  4. 同时并行地,让我们检查一下我们的网络栈。

我们将在下面更深入地探讨这些内容。

图生成

高级图历史

一两年前,我们通过高级图将图生成成本从用户代码编写时间转移到了图优化时间。

y = x + 1                 # graph generation used to happen here
(y,) = dask.optimize(y,)  # now it happens here

这确实提高了可用性,也让我们进行了一些高级优化,有时使我们能够跳过一些低级优化成本。

我们可以进一步推进吗?

我们管道的前四个阶段发生在客户端。

  1. 图生成:Dask 集合库中的一些 Python 代码(例如 dask array)调用 sum 函数,这会在客户端生成一个任务图。
  2. 图优化:然后我们在客户端优化该图,以移除不必要的工作,融合任务,应用重要的高级优化等等。
  3. 图序列化:现在我们将该图打包成可以发送到调度器的形式。
  4. 图通信:我们将这些字节通过网络发送到调度器。

如果我们能够在这些阶段中一直保持高级图表示,直到图通信,那么我们可以将一个更紧凑的表示发送到调度器。我们可以减少很多这些成本,至少对于高级集合 API 是如此(delayed 和 client.submit 仍然会很慢,但 client.map 可能会好一些)。

这还有一些其他不错的好处

  1. 用户代码不会阻塞,我们可以立即通知用户我们正在处理。
  2. 我们将成本集中在调度器上,所以现在只有一个地方我们可能需要考虑低级代码。

(此处有一些讨论:https://github.com/dask/distributed/issues/3872)

然而,低级图优化将是一个问题。

原则上,修改分布式调度器以接受各种图层类型是一个繁琐但直接的问题。我不担心。

更大的担忧是如何处理低级图优化。今天,有三个非常重要的低级图优化:

  1. 任务融合:这使得你的 read_parquet 任务与后续的块级任务合并。
  2. 剪枝 (Culling):这使得 df.head()x[0] 变得快速。
  3. 切片融合 (Slice fusion):这就是为什么 x[:100][5] 运行得好的原因。

为了将抽象图层传输到调度器,我们需要消除对这些低级图优化的需求。我认为我们可以结合两种方法来做到这一点:

更巧妙的高级图操作

我们已经通过块级操作做到了一点,它有自己的融合机制,并很大程度上消除了对一般融合的需求。但其他类似块级的操作,比如 read_*,可能需要加入 Blockwise 家族。

要使剪枝正常工作,可能需要我们教会每个独立的图层如何跟踪每种层类型中的依赖关系并自行剪枝。这可能会变得很棘手。

切片是可行的,我们只需要有人深入研究所有当前的切片优化,并为这些计算创建高级图层。这将是优秀硕士生的绝佳项目。

向 workers 发送推测性任务

高级块级融合处理了许多低级融合的用例,但不是全部。例如,像 dd.read_parquetda.from_zarr 这样的 I/O 层在高级别上没有融合。

我们可以通过将它们变成块级层来解决这个问题(这需要扩展块级抽象,可能很困难),或者,如果我们非常有信心知道它们将去哪里,我们可以开始在所有依赖项完成之前将尚未就绪的任务发送给 workers。这将给我们带来与融合类似的一些结果,但会保持所有任务类型独立(这对于诊断很有用),并且仍然可能给我们带来与融合相同的一些性能优势。

在调度器上解包抽象图层

因此,在我们消除了对低级优化的需求并直接将抽象图层发送到调度器之后,我们需要教会调度器如何解包这些图层。

这有点棘手,因为调度器不能运行用户 Python 代码(出于安全原因)。我们需要提前注册调度器知道并信任的层类型(如 blockwise, rechunk, dataframe shuffle)。我们仍然会一直支持自定义层,并且它们的速度将与以往相同,但如果我们完全采用高级层,希望对它们的需求会大大减少。

用低级语言重写调度器

一旦大部分棘手的部分被移到调度器,我们将有一个地方可以专注于低级图状态操作。

Dask 的分布式调度器由两部分组成

  1. 一个 Tornado TCP 应用程序,接收来自客户端和 workers 的信号,并将信号发送给客户端和 workers。

    这是大量使用异步的网络代码。

  2. 一个内部复杂的有限状态机,响应这些状态变化。

    这是大量使用复杂数据结构的 Python 代码。

网络

Jim 在这里有一个有趣的项目,显示出希望:https://github.com/jcrist/ery 减少 workers 和调度器之间的延迟将是件好事,并有助于加速本文开头列出的管道中的阶段 7-8。

状态机

用一些低级语言重写状态机是可以的。理想情况下,这将是一种对当前维护者社区易于维护的语言(Cython?),但我们也可能在此处建立一个更稳固的接口,允许其他团队安全地进行实验。

这有一些优点(不同团队可以进行更多实验),但也有一些成本(核心工作分散和用户不匹配)。而且,我怀疑拆分也可能意味着我们可能会失去仪表板,除非这些其他团队非常小心地将相同的状态暴露给 Bokeh。

这里还有更多探索要做。无论如何,我认为尝试将状态机与网络系统隔离可能是明智的。这或许也能让人们更容易独立地进行性能分析。

在与一些不同的团队交谈时,大多数人表示对拥有多个不同的状态机代码有所保留。MapReduce 和 Spark 都曾这样做过,结果导致社区动态难以维护。

高级图优化

一旦我们拥有了更智能的高级图层中的所有内容,我们将更容易进行优化。

我们需要一种更好的方法来记录这些优化,需要一个独立的遍历系统和一套规则。我们中的一些人之前已经写过这些东西,也许是时候重新审视它们了。

我们需要什么

这需要一些努力,但我认为它将同时解决几个备受关注的问题。有一些棘手的事情需要做好:

  1. 一个高级图层框架
  2. 一个高级图层优化系统
  3. 将调度器分成两个部分

为此,我认为我们需要对 Dask 相当熟悉的人才能做好。

还有相当多的后续工作

  1. 为 dask dataframe 构建一个层级结构
  2. 为 dask array 构建一个层级结构
  3. 为这些层构建优化,以消除对低级图优化的需求
  4. 用 Cython 重写调度器的核心部分
  5. 尝试网络层,也许使用新的 Comm

我一直在思考实施这项变更的正确方法。过去几年,由于维护者负担沉重,大多数 Dask 的变更都是增量或外围的。然而,这个问题可能面临足够的压力,我们可以从一些组织那里获得一些专门的工程资源,这可能会改变实施的可能性。我们已经从一些团队那里获得了 25% 的时间投入。我很好奇我们是否能为一些人争取几个月的 100% 全职投入。


博客评论由 Disqus 提供支持