摘要

我们已经充分对齐了 Dask DataFrame 和 cuDF,使得诸如以下的 groupby 聚合能够很好地工作。

df.groupby('x').y.mean()

本文描述了我们为此所做的工作,以作为未来开发的模型。

计划

正如之前一篇文章,“Dask、Pandas 和 GPU:第一步”中所述,我们生产分布式 GPU 数据框的计划是将Dask DataFramecudf结合起来。特别是,我们必须

  • 修改 Dask DataFrame,使其不仅能并行化围绕其目前使用的 Pandas DataFrame,还能并行化任何足够像 Pandas DataFrame 的东西。
  • 修改 cuDF,使其足够像 Pandas DataFrame,以适应 Dask DataFrame 中的算法。

改动

在 Dask 方面,这主要意味着替换

  • is_dataframe_like(df) 检查替换 isinstance(df, pd.DataFrame) 检查(在定义了合适的 is_dataframe_like/is_series_like/is_index_like 函数之后)
  • 避免使用 Pandas 中一些更具特色的功能,转而尝试使用我们期望在大多数 DataFrame 实现中都存在的通用功能。

在 cuDF 方面,这意味着进行几十个微小的修改,以使 cuDF API 与 Pandas API 对齐,并添加缺失的功能。

我并不期望任何人仔细查阅所有这些 issue,但我希望通过粗略浏览 issue 标题,人们能够了解我们正在进行的改动类型。这是大量的小改动。

另外,感谢Thomson Comer,他解决了上述大部分 cuDF issue。

还有一些待解决的 issue

但大部分功能已经可用

但总的来说,目前大部分功能运行良好

In [1]: import dask_cudf

In [2]: df = dask_cudf.read_csv('yellow_tripdata_2016-*.csv')

In [3]: df.groupby('passenger_count').trip_distance.mean().compute()
Out[3]: <cudf.Series nrows=10 >

In [4]: _.to_pandas()
Out[4]:
0    0.625424
1    4.976895
2    4.470014
3    5.955262
4    4.328076
5    3.079661
6    2.998077
7    3.147452
8    5.165570
9    5.916169
dtype: float64

经验

首先,大部分工作由 cuDF 开发者完成(这从上面 issue 列表的相对长度中可能很明显)。当我们开始这个过程时,感觉就像一个永无止境的小问题流。我们无法预见下一批问题,直到我们解决了当前的一批。幸运的是,大多数问题都很容易修复。此外,随着工作的进展,随着时间的推移似乎变得更容易了一些。

此外,由于上述改动,除了 groupby-aggregations 之外的许多功能也得以正常工作。从习惯使用 Pandas 的用户的角度来看,cuDF 库正变得越来越可靠。在使用 cuDF 进行其他操作时,遇到缺失功能的频率降低了。

接下来是什么?

最近,我们一直在 Dask DataFrame 中处理各种连接/合并操作,例如排序列上的索引连接,大小数据框之间的连接(一个常见的特例)等等。将这些算法从主线 Dask DataFrame 代码库移植到与 cuDF 配合使用,导致了与上面 groupby-aggregations 类似的一系列问题,但到目前为止,列表要小得多。我们希望随着未来继续处理其他功能集,例如 I/O、时间序列操作、滚动窗口等,这是一个趋势。


博客评论由 Disqus 提供支持