这项工作由 CoiledNVIDIA 工程师实现和支持。特别感谢 Patrick HoeflerRick Zamora。本文的原始版本发布在 blog.coiled.io

Expression tree encoded by dask-expr

引言

Dask DataFrame 目前不会(像 Spark 或 SQL 数据库那样)为你优化代码。这意味着用户会浪费很多计算资源。我们来看一个常见的例子,它乍一看没问题,但实际上效率很低。

import dask.dataframe as dd

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",  # unnecessarily reads all rows and columns
)
result = (
    df[df.hvfhs_license_num == "HV0003"]    # could push the filter into the read parquet call
    .sum(numeric_only=True)
    ["tips"]                                # should read only necessary columns
)

通过几个简单的步骤,我们可以让它运行得更快

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    filters=[("hvfhs_license_num", "==", "HV0003")],
    columns=["tips"],
)
result = df.tips.sum()

目前,Dask DataFrame 不会为你优化这一点,但一项围绕 Dask DataFrame 逻辑查询规划的新工作将为你实现这一目标。本文介绍了一些在 dask-expr 中开发的这些变化。

你可以使用以下方式安装和尝试 dask-expr

pip install dask-expr

本文中使用的是 纽约市出租车 数据集。

Dask 表达式

Dask expressions 在 Dask DataFrames 之上提供了一个逻辑查询规划层。我们来看看最初的例子,并研究如何通过查询优化层提高效率。正如最初指出的,有几点不理想

  • 我们将所有行读入内存,而不是在读取 parquet 文件时进行过滤。
  • 我们将所有列读入内存,而不是只读取必需的列。
  • 我们将过滤和聚合应用于所有列,而不是只应用于 "tips" 列。

dask-expr 的查询优化层可以帮助我们解决这个问题。它会查看这个表达式并确定并非所有行都是必需的。一个中间层会将过滤器转换为 read_parquet 的有效过滤表达式

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    filters=[("hvfhs_license_num", "==", "HV0003")],
)
result = df.sum(numeric_only=True)["tips"]

这仍然会将所有列读入内存,并将计算每个数值列的总和。下一个优化步骤是将列选择也推入 read_parquet 调用中。

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    columns=["tips"],
    filters=[("hvfhs_license_num", "==", "HV0003")],
)
result = df.sum(numeric_only=True)

这是一个可以手动重写的简单示例。更接近实际工作流程的使用场景可能有数百列,如果你需要其中的非简单子集,重写会非常耗时耗力。

让我们看看如何实现这一点。dask-expr 将用户提供的表达式记录在表达式树中

result.pprint()

Projection: columns='tips'
  Sum: numeric_only=True
    Filter:
      ReadParquet: path='s3://coiled-datasets/uber-lyft-tlc/'
      EQ: right='HV0003'
        Projection: columns='hvfhs_license_num'
          ReadParquet: path='s3://coiled-datasets/uber-lyft-tlc/'

这棵树表示表达式的原始形式。我们可以看到,在应用投影和过滤之前,我们会将整个数据集读入内存。值得注意的一点是:看起来我们正在读取数据集两次,但 Dask 能够融合执行相同任务的作业以避免重复计算。让我们重新排序表达式以使其更高效

result.simplify().pprint()

Sum: numeric_only=True
  ReadParquet: path='s3://coiled-datasets/uber-lyft-tlc/'
               columns=['tips']
               filters=[('hvfhs_license_num', '==', 'HV0003')]

这看起来简单多了。dask-expr 重新排序了查询,并将过滤器和列投影推入了 read_parquet 调用中。我们成功地从表达式树中移除了一些步骤,并使剩余的表达式更加高效。这代表了我们最初手动完成的步骤。dask-expr 可以对任意多列执行这些步骤,而不会增加开发人员的负担。

这些只是 dask-expr 中最常见且最容易说明的两种优化技术。一些其他有用的优化已经可用

  • len(...) 将只使用索引计算长度;此外,我们可以忽略许多不会改变 DataFrame 形状的操作,例如 replace 调用。
  • set_indexsort_values 不会立即触发计算。
  • 更智能地选择 merge 算法。

我们仍在添加更多优化技术,以使 Dask DataFrame 查询更高效。

试用一下

该项目目前的状态是,有兴趣的用户应该尝试一下。我们已经发布了一些版本。API 涵盖了 Dask DataFrame API 的很大一部分,并且我们还在不断添加更多内容。我们已经观察到,对于受益于查询优化的工作流程,性能有了非常显著的提升。这些工作流程的内存使用量也有所下降。

我们非常期待您的反馈意见以及改进库的潜在途径。请试用一下,并与我们分享您的体验。

dask-expr 尚未集成到主要的 Dask DataFrame 实现中。您可以使用以下方式安装它

pip install dask-expr

该 API 与 Dask DataFrame 提供的非常相似。它暴露了与 Dask DataFrame 大致相同的方法。在大多数情况下,您可以使用相同的方法。

import dask_expr as dd

您可以在 Readme 中找到支持的操作列表。这个项目仍在积极开发中。API 可能会在没有警告的情况下发生变化。我们的目标是每周发布版本,以便尽快推出新功能。

我们为什么要现在添加这个?

从历史上看,Dask 专注于灵活性和智能调度,而不是查询优化。Dask 内置的分布式调度器使用复杂的算法来确保单个任务的理想调度。它试图确保你的资源得到尽可能高效的利用。图构建过程使 Dask 用户能够构建非常灵活和复杂的图,这些图超出了 SQL 操作的范围。Dask futures API 提供的灵活性需要非常智能的算法,但它使用户能够构建高度复杂的图。下图显示了一个信用风险模型的图

Computation graph representing a credit risk model

强大的调度器和物理优化的特性使我们能够构建非常复杂的程序,然后这些程序将高效运行。不幸的是,这些优化的特性并不能帮助我们避免调度不必要的工作。这就是目前将高级查询优化构建到 Dask 中的工作重点。

结论

Dask 带有非常智能的分布式调度器,但没有多少逻辑查询规划。这正是我们目前通过在 Dask DataFrame 中构建高级查询优化器来纠正的问题。我们期望改进普通 Dask 工作流程的性能并减少内存使用。

这个 API 已经准备好供有兴趣的用户试用。它涵盖了 DataFrame API 的很大一部分。该库正在积极开发中,我们预计在未来几周和几个月内添加更多有趣的功能。


博客评论由 Disqus 提供支持