为 Dask 构建 GPU Groupby-Aggregations
作者:Matthew Rocklin
摘要
我们已经充分对齐了 Dask DataFrame 和 cuDF,使得诸如以下的 groupby 聚合能够很好地工作。
df.groupby('x').y.mean()
本文描述了我们为此所做的工作,以作为未来开发的模型。
计划
正如之前一篇文章,“Dask、Pandas 和 GPU:第一步”中所述,我们生产分布式 GPU 数据框的计划是将Dask DataFrame与cudf结合起来。特别是,我们必须
- 修改 Dask DataFrame,使其不仅能并行化围绕其目前使用的 Pandas DataFrame,还能并行化任何足够像 Pandas DataFrame 的东西。
- 修改 cuDF,使其足够像 Pandas DataFrame,以适应 Dask DataFrame 中的算法。
改动
在 Dask 方面,这主要意味着替换
- 用
is_dataframe_like(df)
检查替换isinstance(df, pd.DataFrame)
检查(在定义了合适的is_dataframe_like
/is_series_like
/is_index_like
函数之后) - 避免使用 Pandas 中一些更具特色的功能,转而尝试使用我们期望在大多数 DataFrame 实现中都存在的通用功能。
在 cuDF 方面,这意味着进行几十个微小的修改,以使 cuDF API 与 Pandas API 对齐,并添加缺失的功能。
- Dask 改动
- 移除显式的 pandas 检查并提供 cudf 延迟注册 #4359
- 将 isinstance(…, pandas) 替换为 is_dataframe_like #4375
- 添加 has_parallel_type
- 延迟注册更多 cudf 函数并移至 backends 文件 #4396
- 避免在 is_dataframe_like 中检查类型 #4418
- 将特定于 cudf 的代码替换为 dask-cudf 导入 #4470
- 在 groupby-var 中避免使用 groupby.agg(callable) #4482 – 这一项值得注意,通过简化我们对 Pandas 的使用,我们在 Pandas 端实际上获得了显著的加速。
- cuDF 改动
- 从 CUDA 数组库构建 DataFrames #529
- Groupby AttributeError
- 支持索引上的比较操作 #556
- 在 read_csv(及其他格式)中支持字节范围 #568:w
- 允许 “df.index = some_index” #824
- 支持在 groupby 对象上进行索引 #828
- 支持 df.reset_index(drop=True) #831
- 添加 Series.groupby #879
- 支持 Dataframe/Series groupby level=0 #880
- 实现 DataFrame 对象的除法 #900
- Groupby 对象不能通过列名进行索引 #934
- 支持索引操作上的比较 #937
- 添加 DataFrame.rename #944
- 设置 dataframe/series 的索引 #967
- 支持 concat(…, axis=1) #968
- 支持使用列中的 pandas 索引进行索引 #969
- 支持用另一个布尔 dataframe 对 dataframe 进行索引 #970
我并不期望任何人仔细查阅所有这些 issue,但我希望通过粗略浏览 issue 标题,人们能够了解我们正在进行的改动类型。这是大量的小改动。
另外,感谢Thomson Comer,他解决了上述大部分 cuDF issue。
还有一些待解决的 issue
- 平方根 #1055,groupby-std 需要
-
gropuby.agg({'x': ['sum', mean'], 'y': ['min', 'max']})
但大部分功能已经可用
但总的来说,目前大部分功能运行良好
In [1]: import dask_cudf
In [2]: df = dask_cudf.read_csv('yellow_tripdata_2016-*.csv')
In [3]: df.groupby('passenger_count').trip_distance.mean().compute()
Out[3]: <cudf.Series nrows=10 >
In [4]: _.to_pandas()
Out[4]:
0 0.625424
1 4.976895
2 4.470014
3 5.955262
4 4.328076
5 3.079661
6 2.998077
7 3.147452
8 5.165570
9 5.916169
dtype: float64
经验
首先,大部分工作由 cuDF 开发者完成(这从上面 issue 列表的相对长度中可能很明显)。当我们开始这个过程时,感觉就像一个永无止境的小问题流。我们无法预见下一批问题,直到我们解决了当前的一批。幸运的是,大多数问题都很容易修复。此外,随着工作的进展,随着时间的推移似乎变得更容易了一些。
此外,由于上述改动,除了 groupby-aggregations 之外的许多功能也得以正常工作。从习惯使用 Pandas 的用户的角度来看,cuDF 库正变得越来越可靠。在使用 cuDF 进行其他操作时,遇到缺失功能的频率降低了。
接下来是什么?
最近,我们一直在 Dask DataFrame 中处理各种连接/合并操作,例如排序列上的索引连接,大小数据框之间的连接(一个常见的特例)等等。将这些算法从主线 Dask DataFrame 代码库移植到与 cuDF 配合使用,导致了与上面 groupby-aggregations 类似的一系列问题,但到目前为止,列表要小得多。我们希望随着未来继续处理其他功能集,例如 I/O、时间序列操作、滚动窗口等,这是一个趋势。
博客评论由 Disqus 提供支持