执行摘要

我们正在利用 cuDFDask Dataframe 构建一个分布式 GPU Pandas dataframe。这项工作刚刚起步。

本文描述了当前的情况、我们的总体方法,并提供了目前哪些功能可用、哪些不可用的示例。最后,我们提供了一些关于扩展性能的说明。

您也可以将本文中的实验视为notebook

以下是结果表

架构 时间 带宽
单 CPU 核心 3分 14秒 50 MB/秒
八 CPU 核心 58秒 170 MB/秒
四十 CPU 核心 35秒 285 MB/秒
一个 GPU 11秒 900 MB/秒
八个 GPU 5秒 2000 MB/秒

构建模块:cuDF 和 Dask

构建一个分布式 GPU 支持的 dataframe 是一项艰巨的任务。幸运的是,我们正从良好的基础开始,并且可以利用现有组件组装这个系统的大部分。

  1. cuDF 库旨在在 GPU 上实现 Pandas API。它在读取 CSV 文件、过滤和聚合列、连接等标准操作上获得了显著的加速。

    import cudf  # looks and feels like Pandas, but runs on the GPU
    
    df = cudf.read_csv('myfile.csv')
    df = df[df.name == 'Alice']
    df.groupby('id').value.mean()
    

    cuDF 是不断发展的 RAPIDS 项目的一部分。

  2. Dask Dataframe 库围绕 Pandas API 提供了并行算法。它根据任务图将分布式 groupbys 或分布式连接等大型操作分解为许多较小的单节点 groupbys 或连接(以及许多其他操作)。

    import dask.dataframe as dd  # looks and feels like Pandas, but runs in parallel
    
    df = dd.read_csv('myfile.*.csv')
    df = df[df.name == 'Alice']
    df.groupby('id').value.mean().compute()
    
  3. Dask 分布式任务调度器针对复杂的任务图提供了通用的并行执行。它非常适合将多节点计算添加到现有代码库中。

鉴于这些构建模块,我们的方法是使 cuDF API 足够接近 Pandas,以便我们可以重用 Dask Dataframe 算法。

这种方法的优势与挑战

这种方法有几个优势

  1. 我们可以重用最初为 Pandas 设计的 Dask Dataframe 中的并行算法。

  2. 它将开发工作整合到一个代码库中,这样将来在 CPU Dataframe 上花费的精力也能惠及 GPU Dataframe,反之亦然。维护成本得以分摊。

  3. 通过构建与两种 DataFrame 实现(CPU 和 GPU)都能很好协同工作的代码,我们建立了约定和协议,这将使其他项目更容易做到这一点,无论是使用这两个类似 Pandas 的库,还是使用未来的类似 Pandas 的库。

    这种方法还旨在证明生态系统应该支持类似 Pandas 的库,而不仅仅是 Pandas。例如,如果(何时?)Arrow 库开发了计算系统,那么我们将能更好地将其融入进来。

  4. 在进行任何重构时,我们往往会清理现有代码。

    例如,为了让 dask dataframe 支持新的 GPU Parquet 读取器,我们最终重构并简化了我们的 Parquet I/O 逻辑

这种方法也有一些缺点。特别是,它给 cuDF 带来了 API 压力,使其需要与 Pandas 匹配,因此

  1. API 的微小差异现在会导致更大的问题,例如以下几点

  2. cuDF 承受着压力,需要重复一些人认为是 Pandas API 中的错误。

    例如,cuDF 今天对缺失值的支持可以说比 Pandas 更合理。cuDF 是否应该仅仅为了匹配 Pandas 的语义而退回到旧的方式?Dask Dataframe 可能需要更灵活,以便处理演变和语义上的微小差异。

替代方案

我们也可以围绕 cuDF 编写一个新的 dask-dataframe 风格的项目,该项目与 Pandas/Dask Dataframe API 有所不同。直到最近,这实际上一直是采用的方法,dask-cudf 项目正是这样做的。这可能是早期入门和原型开发的好选择。该项目能够使用 dask delayed 实现广泛的功能,包括 groupby-aggregations、 joins 等等。

然而,我们现在正在 Dask dataframe 的基础上重新进行这项工作,这意味着我们正在失去 dask-cudf 已经拥有的一些功能,但希望我们现在添加的功能将更稳定,并建立在更坚实的基础上。

当前状态

目前只有很少的功能可用,但可用的功能都相当流畅。

这是一个简单的示例,它从多个 CSV 文件中读取一些数据,选取一列,并进行一些聚合。

from dask_cuda import LocalCUDACluster
import dask_cudf
from dask.distributed import Client

cluster = LocalCUDACluster()  # runs on eight local GPUs
client = Client(cluster)

gdf = dask_cudf.read_csv('data/nyc/many/*.csv')  # wrap around many CSV files

>>> gdf.passenger_count.sum().compute()
184464740

另请注意,纽约市出租车乘客量比几年前显著减少了

我对上述示例感到兴奋的地方

  • 围绕 cuDF 代码的所有基础设施,如集群设置、诊断、JupyterLab 环境等等,都像任何其他新的 Dask 项目一样,是免费提供的。

    这是我的 JupyterLab 设置图像

    Dask + CUDA + cuDF JupyterLab environment

  • 我们的 df 对象实际上只是一个普通的 Dask Dataframe。我们无需编写新的 __repr____add__.sum() 实现,而且许多我们没有想到的函数今天也能很好地工作(尽管也有许多不行)。

  • 我们紧密集成并与其他系统更紧密地连接。例如,如果我们想将 dask-cudf-dataframe 转换为 dask-pandas-dataframe,我们只需使用 cuDF 的 to_pandas 函数即可

    df = df.map_partitions(cudf.DataFrame.to_pandas)
    

    我们不必编写任何特殊内容,例如单独的 .to_dask_dataframe 方法或处理其他特殊情况。

    Dask 的并行性与选择 CPU 还是 GPU 是正交的。

  • 切换硬件很容易。通过避免单独的 dask-cudf 代码路径,更容易将 cuDF 添加到现有的 Dask+Pandas 代码库中以在 GPU 上运行,或者如果我们希望代码在没有 GPU 的情况下也能运行,则移除 cuDF 并使用 Pandas。

    在下面的扩展部分中有更多示例。

上述示例存在的问题

总的来说,答案是许多小问题

  1. cudf.read_csv 函数尚不支持从单个 CSV 文件分块读取,因此对于非常大的 CSV 文件效果不佳。我们不得不先使用普通的 Dask+Pandas 将大型 CSV 文件拆分成许多小型 CSV 文件

    import dask.dataframe as dd
    (df = dd.read_csv('few-large/*.csv')
            .repartition(npartitions=100)
            .to_csv('many-small/*.csv', index=False))
    

    (参见 rapidsai/cudf #568)

  2. 许多在 dask-cudf 中曾经可用的操作,例如 groupby-aggregations 和 joins,现在不再工作。在接下来的几个月里,我们需要稍微修改许多 cuDF API,使其更紧密地匹配其 Pandas 等价物。

  3. 我运行了两次计时单元,因为它目前导入 cudf 需要几秒钟。 rapidsai/cudf #627

  4. 我们不得不让 Dask Dataframe 更加灵活,并减少对其组成 dataframe 必须完全是 Pandas dataframe 的假设。(参见dask/dask #4359dask/dask #4375 的示例)。我怀疑将来还需要许多这样的小修改。

这些问题代表了数十个类似的问题。它们都是可以修复的,事实上,许多问题目前正由致力于 RAPIDS 的优秀人士积极修复。

近期计划

RAPIDS 团队目前正忙于发布 0.5 版本,其中包括运行上述示例所需的一些修复,以及许多不相关的稳定性改进。这可能会让他们忙碌一到两周,在此期间,我预计除了规划之外,Dask + cuDF 的工作不会有很多进展。

在那之后,Dask 并行性支持将是重中之重,因此我期待看到在此方面取得一些快速进展。

扩展结果

我上一篇关于将 Dask Array 与 CuPy(一个 GPU 加速的 Numpy)结合的博文中,我们看到在处理一些简单的随机数据这个简单问题上,使用多个 GPU 获得了令人印象深刻的加速。

Dask Array + CuPy 处理随机数据

架构 时间
单 CPU 核心 2小时 39分
四十 CPU 核心 11分 30秒
一个 GPU 1分 37秒
八个 GPU 19秒

那个练习很容易扩展,因为它几乎完全受限于创建随机数据的计算。

Dask DataFrame + cuDF 处理 CSV 数据

我们对上述 read_csv 示例进行了类似的研究,该示例主要受限于从磁盘读取 CSV 数据然后解析它。您可以在这里看到可用的 notebook。我们展示了相似(尽管不太令人印象深刻)的数字。

架构 时间 带宽
单 CPU 核心 3分 14秒 50 MB/秒
八 CPU 核心 58秒 170 MB/秒
四十 CPU 核心 35秒 285 MB/秒
一个 GPU 11秒 900 MB/秒
八个 GPU 5秒 2000 MB/秒

带宽数字是通过注意到磁盘上的数据约为 10 GB 计算得出的

分析

首先,我想再次强调,由于所有不同项目之间存在 Pandas API 兼容性,使用此设置可以轻松测试各种架构。我们看到在各种不同硬件上,性能范围很广(40 倍跨度),且成本点各异。

其次,请注意,这个问题在 CPU 和 GPU 上的扩展性都不如我们之前使用 CuPy 的示例。我怀疑这是因为这个示例也受限于 I/O,而不仅仅是计算。虽然从单 CPU 到单 GPU 的飞跃很大,但从单 CPU 到多 CPU 或从单 GPU 到多 GPU 的飞跃不如我们期望的那么大。例如,对于 GPU,当我们添加 8 倍多的 GPU 时,速度提升了大约 2 倍。

起初,人们可能会认为这是因为我们使磁盘读取速度饱和了。然而,有两个证据与这种猜测相悖

  • 熟悉我当前硬件的 NVIDIA 人员告诉我,他们仔细操作时能够获得更高的 I/O 吞吐量
  • CPU 的扩展性同样不佳,尽管它显然没有达到完全的 I/O 带宽

相反,很可能只是我们对磁盘和 I/O 流水线的处理不够仔细。

我们可以考虑更仔细地考虑单台机器内的数据局部性问题。或者,我们也可以选择使用更小的机器,或者许多更小的机器。我的团队一直要求我开始尝试一些比 DGX 更便宜的系统,我可能很快就会进行这方面的实验。对于数据加载和预处理工作负载而言,过去“将尽可能多的计算打包到一台机器中”的经验之谈可能不再适用(除非我们做更多的工作)。

来帮忙

如果您对上述工作感兴趣,那就来帮忙吧!有很多容易实现且影响深远的工作可以做。

如果您有兴趣专注于这些主题并获得报酬,那么请考虑申请一份工作。NVIDIA 的 RAPIDS 团队正在招聘工程师,从事 Dask 与 GPU 的开发以及其他数据分析库的开发项目。


博客评论由 Disqus 提供支持