这项工作由 Anaconda Inc 支持

摘要

Dask DataFrame 与 pandas 新的 Extension Array 接口配合良好,包括第三方扩展数组。这使得 Dask 可以

  1. 轻松支持 pandas 的新扩展数组,例如它们新的 可空整数数组
  2. 支持第三方扩展数组,例如 cyberpandas 的 IPArray

背景

Pandas 0.23 引入了 ExtensionArray,这是一种在 DataFrame 或 Series 中存储除简单 NumPy 数组之外数据的方式。在内部,pandas 将其用于 NumPy 原生不支持的数据类型,例如带时区的时间日期、Categorical 或(新的!)可空整数数组。

>>> s = pd.Series(pd.date_range('2000', periods=4, tz="US/Central"))
>>> s
0   2000-01-01 00:00:00-06:00
1   2000-01-02 00:00:00-06:00
2   2000-01-03 00:00:00-06:00
3   2000-01-04 00:00:00-06:00
dtype: datetime64[ns, US/Central]

dask.dataframe 一直支持 pandas 定义的扩展类型。

>>> import dask.dataframe as dd
>>> dd.from_pandas(s, npartitions=2)
Dask Series Structure:
npartitions=2
0    datetime64[ns, US/Central]
2                           ...
3                           ...
dtype: datetime64[ns, US/Central]
Dask Name: from_pandas, 2 tasks

挑战

较新版本的 pandas 允许第三方库编写自定义扩展数组。这些数组可以放在 DataFrame 或 Series 中,并且与 pandas 自身定义的任何扩展数组一样好用。然而,第三方扩展数组给 Dask 带来了一点挑战。

回顾:dask.dataframe 是惰性的。我们使用熟悉的类 pandas API 来构建任务图,而不是立即执行。但是,如果 Dask DataFrame 是惰性的,那么以下操作是如何工作的?

>>> df = pd.DataFrame({"A": [1, 2], 'B': [3, 4]})
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf[['B']].columns
Index(['B'], dtype='object')

ddf[['B']](惰性地)从 dataframe 中选择列 'B'。但是访问 .columns立即返回一个仅包含所选列的 pandas Index 对象。

没有发生实际计算(您可以轻松地将 from_pandas 换成 dd.read_parquet 处理大于内存的数据集,行为也是一样的)。Dask 能够进行这种“仅元数据”的计算,即输出仅依赖于列和 dtypes,而不执行任务图。在内部,Dask 通过在每个 Dask DataFrame 上保留一对虚拟 pandas DataFrame 来实现这一点。

>>> ddf._meta
Empty DataFrame
Columns: [A, B]
Index: []

>>> ddf._meta_nonempty
ddf._meta_nonempty
   A  B
0  1  1
1  1  1

我们需要 _meta_nonempty,因为 pandas 中的某些操作在空 DataFrame 上的行为与非空 DataFrame 不同(无论是设计使然,还是偶尔出现的 pandas 错误)。

第三方扩展数组的问题在于 Dask 不知道在 _meta_nonempty 中放入什么值。我们很高兴为每个 NumPy dtype 以及 pandas 自身的每个扩展 dtype 执行此操作。但是任何第三方库都可以为任何类型创建一个 ExtensionArray,Dask 将无法知道其有效值是什么。

解决方案

与其让 Dask 猜测 _meta_nonempty 使用什么值,扩展数组作者(或用户)可以将他们的扩展 dtype 注册到 Dask。一旦注册,Dask 就能够生成 _meta_nonempty,之后一切都应该正常工作。例如,我们可以将 pandas 用于测试的虚拟 DecimalArray(这不是 pandas 公共 API 的一部分)注册到 Dask。

from decimal import Decimal
from pandas.tests.extension.decimal import DecimalArray, DecimalDtype

# The actual registration that would be done in the 3rd-party library
from dask.dataframe.extensions import make_array_nonempty


@make_array_nonempty.register(DecimalDtype)
def _(dtype):
    return DecimalArray._from_sequence([Decimal('0'), Decimal('NaN')],
                                       dtype=dtype)

现在,该扩展类型的用户可以将这些数组放入 Dask DataFrame 或 Series 中。

>>> df = pd.DataFrame({"A": DecimalArray([Decimal('1.0'), Decimal('2.0'),
...                                       Decimal('3.0')])})

>>> ddf = dd.from_pandas(df, 2)
>>> ddf
Dask DataFrame Structure:
                     A
npartitions=1
0              decimal
2                  ...
Dask Name: from_pandas, 1 tasks

>>> ddf.dtypes
A    decimal
dtype: object

从那里开始,可以像在 pandas 中一样执行常规操作。

>>> from random import choices
>>> df = pd.DataFrame({"A": DecimalArray(choices([Decimal('1.0'),
...                                               Decimal('2.0')],
...                                              k=100)),
...                    "B": np.random.choice([0, 1, 2, 3], size=(100,))})
>>> ddf = dd.from_pandas(df, 2)
In [35]: ddf.groupby("A").B.mean().compute()
Out[35]:
A
1.0    1.50
2.0    1.48
Name: B, dtype: float64

真正的启示

Dask 现在支持扩展数组真是太棒了。但对我来说,令人兴奋的是这项工作所花费的精力如此之少。实现第三方扩展数组支持的 PR 非常简短,只需定义第三方注册的对象,并在检测到 dtype 时使用它来生成数据。支持 pandas 0.24.0 中的三种新扩展数组(IntegerArrayPeriodArrayIntervalArray),只需要几行代码

@make_array_nonempty.register(pd.Interval):
def _(dtype):
    return IntervalArray.from_breaks([0, 1, 2], closed=dtype.closed)


@make_array_nonempty.register(pd.Period):
def _(dtype):
    return period_array([2000, 2001], freq=dtype.freq)


@make_array_nonempty.register(_IntegerDtype):
def _(dtype):
    return integer_array([0, None], dtype=dtype)

Dask 直接受益于 pandas 的改进。Dask 无需构建新的并行扩展数组接口,也无需使用并行接口重新实现所有新的扩展数组。我们只是重用了 pandas 已经完成的工作,并且它与现有的 Dask 结构相吻合。

对于第三方扩展数组的作者,例如 cyberpandas,工作量也同样微不足道。他们无需从头开始重新实现所有功能,就能与 Dask 良好配合。

这突出了 Dask 项目核心价值观之一的重要性:与社区合作。如果您访问 dask.org,您会看到诸如

与现有项目集成

以及

与更广泛的社区共同构建

在 Dask 启动之初,开发者本可以从头开始重写 pandas 或 NumPy 以使其并行友好(尽管我们今天可能仍在进行那部分工作,因为那是一项艰巨的任务)。相反,Dask 开发者选择与社区合作,偶尔引导其朝着有助于 dask 的方向发展。例如,pandas 中许多地方持有 GIL,阻止了基于线程的并行。Dask 和 pandas 开发者并没有放弃 pandas,而是一起努力,在 dask.dataframe 成为瓶颈时,尽可能地释放 GIL。这使 Dask 以及任何尝试使用 pandas DataFrames 进行基于线程的并行的人都受益。

现在,当 pandas 引入可空整数等新功能时,dask.dataframe 只需将其注册为扩展类型,即可立即受益。第三方扩展数组的作者也可以对他们的扩展数组执行相同的操作。

如果您正在编写 ExtensionArray,请务必将其添加到 pandas 生态系统页面,并将其注册到 Dask!


Disqus 提供支持的博客评论