Dask、Pandas 和 GPU:第一步
作者:Matthew Rocklin
执行摘要
我们正在利用 cuDF 和 Dask Dataframe 构建一个分布式 GPU Pandas dataframe。这项工作刚刚起步。
本文描述了当前的情况、我们的总体方法,并提供了目前哪些功能可用、哪些不可用的示例。最后,我们提供了一些关于扩展性能的说明。
您也可以将本文中的实验视为notebook。
以下是结果表
架构 | 时间 | 带宽 |
---|---|---|
单 CPU 核心 | 3分 14秒 | 50 MB/秒 |
八 CPU 核心 | 58秒 | 170 MB/秒 |
四十 CPU 核心 | 35秒 | 285 MB/秒 |
一个 GPU | 11秒 | 900 MB/秒 |
八个 GPU | 5秒 | 2000 MB/秒 |
构建模块:cuDF 和 Dask
构建一个分布式 GPU 支持的 dataframe 是一项艰巨的任务。幸运的是,我们正从良好的基础开始,并且可以利用现有组件组装这个系统的大部分。
-
的 cuDF 库旨在在 GPU 上实现 Pandas API。它在读取 CSV 文件、过滤和聚合列、连接等标准操作上获得了显著的加速。
import cudf # looks and feels like Pandas, but runs on the GPU df = cudf.read_csv('myfile.csv') df = df[df.name == 'Alice'] df.groupby('id').value.mean()
cuDF 是不断发展的 RAPIDS 项目的一部分。
-
的 Dask Dataframe 库围绕 Pandas API 提供了并行算法。它根据任务图将分布式 groupbys 或分布式连接等大型操作分解为许多较小的单节点 groupbys 或连接(以及许多其他操作)。
import dask.dataframe as dd # looks and feels like Pandas, but runs in parallel df = dd.read_csv('myfile.*.csv') df = df[df.name == 'Alice'] df.groupby('id').value.mean().compute()
-
的 Dask 分布式任务调度器针对复杂的任务图提供了通用的并行执行。它非常适合将多节点计算添加到现有代码库中。
鉴于这些构建模块,我们的方法是使 cuDF API 足够接近 Pandas,以便我们可以重用 Dask Dataframe 算法。
这种方法的优势与挑战
这种方法有几个优势
-
我们可以重用最初为 Pandas 设计的 Dask Dataframe 中的并行算法。
-
它将开发工作整合到一个代码库中,这样将来在 CPU Dataframe 上花费的精力也能惠及 GPU Dataframe,反之亦然。维护成本得以分摊。
-
通过构建与两种 DataFrame 实现(CPU 和 GPU)都能很好协同工作的代码,我们建立了约定和协议,这将使其他项目更容易做到这一点,无论是使用这两个类似 Pandas 的库,还是使用未来的类似 Pandas 的库。
这种方法还旨在证明生态系统应该支持类似 Pandas 的库,而不仅仅是 Pandas。例如,如果(何时?)Arrow 库开发了计算系统,那么我们将能更好地将其融入进来。
-
在进行任何重构时,我们往往会清理现有代码。
例如,为了让 dask dataframe 支持新的 GPU Parquet 读取器,我们最终重构并简化了我们的 Parquet I/O 逻辑。
这种方法也有一些缺点。特别是,它给 cuDF 带来了 API 压力,使其需要与 Pandas 匹配,因此
-
API 的微小差异现在会导致更大的问题,例如以下几点
-
cuDF 承受着压力,需要重复一些人认为是 Pandas API 中的错误。
例如,cuDF 今天对缺失值的支持可以说比 Pandas 更合理。cuDF 是否应该仅仅为了匹配 Pandas 的语义而退回到旧的方式?Dask Dataframe 可能需要更灵活,以便处理演变和语义上的微小差异。
替代方案
我们也可以围绕 cuDF 编写一个新的 dask-dataframe 风格的项目,该项目与 Pandas/Dask Dataframe API 有所不同。直到最近,这实际上一直是采用的方法,dask-cudf 项目正是这样做的。这可能是早期入门和原型开发的好选择。该项目能够使用 dask delayed 实现广泛的功能,包括 groupby-aggregations、 joins 等等。
然而,我们现在正在 Dask dataframe 的基础上重新进行这项工作,这意味着我们正在失去 dask-cudf 已经拥有的一些功能,但希望我们现在添加的功能将更稳定,并建立在更坚实的基础上。
当前状态
目前只有很少的功能可用,但可用的功能都相当流畅。
这是一个简单的示例,它从多个 CSV 文件中读取一些数据,选取一列,并进行一些聚合。
from dask_cuda import LocalCUDACluster
import dask_cudf
from dask.distributed import Client
cluster = LocalCUDACluster() # runs on eight local GPUs
client = Client(cluster)
gdf = dask_cudf.read_csv('data/nyc/many/*.csv') # wrap around many CSV files
>>> gdf.passenger_count.sum().compute()
184464740
另请注意,纽约市出租车乘客量比几年前显著减少了
我对上述示例感到兴奋的地方
-
围绕 cuDF 代码的所有基础设施,如集群设置、诊断、JupyterLab 环境等等,都像任何其他新的 Dask 项目一样,是免费提供的。
这是我的 JupyterLab 设置图像
-
我们的
df
对象实际上只是一个普通的 Dask Dataframe。我们无需编写新的__repr__
、__add__
或.sum()
实现,而且许多我们没有想到的函数今天也能很好地工作(尽管也有许多不行)。 -
我们紧密集成并与其他系统更紧密地连接。例如,如果我们想将 dask-cudf-dataframe 转换为 dask-pandas-dataframe,我们只需使用 cuDF 的
to_pandas
函数即可df = df.map_partitions(cudf.DataFrame.to_pandas)
我们不必编写任何特殊内容,例如单独的
.to_dask_dataframe
方法或处理其他特殊情况。Dask 的并行性与选择 CPU 还是 GPU 是正交的。
-
切换硬件很容易。通过避免单独的
dask-cudf
代码路径,更容易将 cuDF 添加到现有的 Dask+Pandas 代码库中以在 GPU 上运行,或者如果我们希望代码在没有 GPU 的情况下也能运行,则移除 cuDF 并使用 Pandas。在下面的扩展部分中有更多示例。
上述示例存在的问题
总的来说,答案是许多小问题。
-
的
cudf.read_csv
函数尚不支持从单个 CSV 文件分块读取,因此对于非常大的 CSV 文件效果不佳。我们不得不先使用普通的 Dask+Pandas 将大型 CSV 文件拆分成许多小型 CSV 文件import dask.dataframe as dd (df = dd.read_csv('few-large/*.csv') .repartition(npartitions=100) .to_csv('many-small/*.csv', index=False))
(参见 rapidsai/cudf #568)
-
许多在 dask-cudf 中曾经可用的操作,例如 groupby-aggregations 和 joins,现在不再工作。在接下来的几个月里,我们需要稍微修改许多 cuDF API,使其更紧密地匹配其 Pandas 等价物。
-
我运行了两次计时单元,因为它目前导入
cudf
需要几秒钟。 rapidsai/cudf #627 -
我们不得不让 Dask Dataframe 更加灵活,并减少对其组成 dataframe 必须完全是 Pandas dataframe 的假设。(参见dask/dask #4359 和 dask/dask #4375 的示例)。我怀疑将来还需要许多这样的小修改。
这些问题代表了数十个类似的问题。它们都是可以修复的,事实上,许多问题目前正由致力于 RAPIDS 的优秀人士积极修复。
近期计划
RAPIDS 团队目前正忙于发布 0.5 版本,其中包括运行上述示例所需的一些修复,以及许多不相关的稳定性改进。这可能会让他们忙碌一到两周,在此期间,我预计除了规划之外,Dask + cuDF 的工作不会有很多进展。
在那之后,Dask 并行性支持将是重中之重,因此我期待看到在此方面取得一些快速进展。
扩展结果
在我上一篇关于将 Dask Array 与 CuPy(一个 GPU 加速的 Numpy)结合的博文中,我们看到在处理一些简单的随机数据这个简单问题上,使用多个 GPU 获得了令人印象深刻的加速。
Dask Array + CuPy 处理随机数据
架构 | 时间 |
---|---|
单 CPU 核心 | 2小时 39分 |
四十 CPU 核心 | 11分 30秒 |
一个 GPU | 1分 37秒 |
八个 GPU | 19秒 |
那个练习很容易扩展,因为它几乎完全受限于创建随机数据的计算。
Dask DataFrame + cuDF 处理 CSV 数据
我们对上述 read_csv
示例进行了类似的研究,该示例主要受限于从磁盘读取 CSV 数据然后解析它。您可以在这里看到可用的 notebook。我们展示了相似(尽管不太令人印象深刻)的数字。
架构 | 时间 | 带宽 |
---|---|---|
单 CPU 核心 | 3分 14秒 | 50 MB/秒 |
八 CPU 核心 | 58秒 | 170 MB/秒 |
四十 CPU 核心 | 35秒 | 285 MB/秒 |
一个 GPU | 11秒 | 900 MB/秒 |
八个 GPU | 5秒 | 2000 MB/秒 |
带宽数字是通过注意到磁盘上的数据约为 10 GB 计算得出的
分析
首先,我想再次强调,由于所有不同项目之间存在 Pandas API 兼容性,使用此设置可以轻松测试各种架构。我们看到在各种不同硬件上,性能范围很广(40 倍跨度),且成本点各异。
其次,请注意,这个问题在 CPU 和 GPU 上的扩展性都不如我们之前使用 CuPy 的示例。我怀疑这是因为这个示例也受限于 I/O,而不仅仅是计算。虽然从单 CPU 到单 GPU 的飞跃很大,但从单 CPU 到多 CPU 或从单 GPU 到多 GPU 的飞跃不如我们期望的那么大。例如,对于 GPU,当我们添加 8 倍多的 GPU 时,速度提升了大约 2 倍。
起初,人们可能会认为这是因为我们使磁盘读取速度饱和了。然而,有两个证据与这种猜测相悖
- 熟悉我当前硬件的 NVIDIA 人员告诉我,他们仔细操作时能够获得更高的 I/O 吞吐量
- CPU 的扩展性同样不佳,尽管它显然没有达到完全的 I/O 带宽
相反,很可能只是我们对磁盘和 I/O 流水线的处理不够仔细。
我们可以考虑更仔细地考虑单台机器内的数据局部性问题。或者,我们也可以选择使用更小的机器,或者许多更小的机器。我的团队一直要求我开始尝试一些比 DGX 更便宜的系统,我可能很快就会进行这方面的实验。对于数据加载和预处理工作负载而言,过去“将尽可能多的计算打包到一台机器中”的经验之谈可能不再适用(除非我们做更多的工作)。
来帮忙
如果您对上述工作感兴趣,那就来帮忙吧!有很多容易实现且影响深远的工作可以做。
如果您有兴趣专注于这些主题并获得报酬,那么请考虑申请一份工作。NVIDIA 的 RAPIDS 团队正在招聘工程师,从事 Dask 与 GPU 的开发以及其他数据分析库的开发项目。
博客评论由 Disqus 提供支持