这项工作得到了 Continuum AnalyticsXDATA 项目的支持,是 Blaze 项目的一部分

本文主要面向开发者。内容涉及仍在实验阶段的代码,尚未准备好供用户使用。

太长不看 (tl;dr) 我们能构建 dask.frame 吗?一种方法涉及索引和大量洗牌(重排)。

Dask 数组进展

在过去的两个月里,我们见证了 dask (一个任务调度规范)和 dask.array (一个旨在使用分块算法实现外存 nd 数组的项目)的创建。(博文:1, 2, 3, 4, 5, 6)。这进展得很顺利。dask.array 已在主要的 conda 通道和 PyPI 上可用,并且在大多数情况下,它是 NumPy 部分操作的愉快直接替代品。我对此非常满意。

conda install dask
or
pip install dask

虽然仍有工作要做,特别是我想与遇到实际问题的人交流,但总的来说,dask.array 感觉已经准备就绪。

接下来是 dask frames

我们能为 Pandas 做我们刚刚为 NumPy 做的事情吗?

问题:我们能否将大型 DataFrame 表示为一系列内存中的 DataFrame,并使用任务调度执行大多数 Pandas 操作?

回答:我不知道。我们来试试。

朴素方法

如果我们将 dask.array 表示为 NumPy ndarray 的 N 维网格,那么也许我们应该将 dask.frame 表示为 Pandas DataFrame 的一维网格;它们有点像数组。

dask.array 朴素的 dask.frame

这种方法支持以下操作

  • 元素级操作 df.a + df.b
  • 按行过滤 df[df.a > 0]
  • 聚合操作 df.a.mean()
  • 一些与标准聚合操作结合的拆分-应用-合并(split-apply-combine)操作,例如 df.groupby('a').b.mean()。本质上,任何你可以用 df.groupby(...).agg(...) 完成的操作。

聚合操作和拆分-应用-合并操作需要一些巧妙的处理。这就是 Blaze 现在的工作方式,也是它在这些笔记本中进行外存操作的方式:Blaze 和 CSV 文件Blaze 和二进制存储

然而,这种方法不支持以下操作

  • 连接 (Joins)
  • 带有更复杂的 transformapply 合并步骤的拆分-应用-合并操作
  • 滑动窗口或重采样操作
  • 涉及多个数据集的任何操作

基于索引值进行分区

我们不基于块的大小进行分区,而是基于索引的值范围进行分区。

基于块大小进行分区 基于索引值进行分区

这开启了一些额外的操作可能性

  • 当两个表共享相同的索引时,连接操作是可能的。因为我们拥有索引值的信息,所以我们知道一侧的哪些块需要与另一侧的哪些块进行通信。
  • 当分组键是索引时,带有 transform/apply 步骤的拆分-应用-合并操作是可能的。在这种情况下,我们保证每个组都在同一个块中。这开启了通用的 df.groupby(...).apply(...)
  • 如果像 dask.array 中的 ghosting operations 那样在块之间共享少量信息,那么对索引执行滚动或重采样操作就很简单。

我们注意到以下主题

如果逻辑与索引对齐,复杂操作就变得容易

因此,许多复杂操作的秘诀变成了

  1. 根据适当的列重新索引数据
  2. 执行简单的计算

对外存数据重新索引

明确地说,想象我们有一个大型交易时间序列,按时间索引并按天分区。每天的数据都在一个单独的 DataFrame 中。

Block 1
-------
                     credit    name
time
2014-01-01 00:00:00     100     Bob
2014-01-01 01:00:00     200   Edith
2014-01-01 02:00:00    -300   Alice
2014-01-01 03:00:00     400     Bob
2014-01-01 04:00:00    -500  Dennis
...

Block 2
-------
                     credit    name
time
2014-01-02 00:00:00     300    Andy
2014-01-02 01:00:00     200   Edith
...

我们想要重新索引这些数据并洗牌(重排)所有条目,以便现在我们按人的名字进行分区。也许所有名字为 A 的都在一个块中,而所有名字为 B 的都在另一个块中。

Block 1
-------
                       time  credit
name
Alice   2014-04-30 00:00:00     400
Alice   2014-01-01 00:00:00     100
Andy    2014-11-12 00:00:00    -200
Andy    2014-01-18 00:00:00     400
Andy    2014-02-01 00:00:00    -800
...

Block 2
-------
                       time  credit
name
Bob     2014-02-11 00:00:00     300
Bob     2014-01-05 00:00:00     100
...

重新索引和洗牌大型数据是困难且昂贵的。我们需要找到合适的值来对数据进行分区,以便获得大小均匀且能很好地放入内存中的块。我们还需要将所有原始块中的条目洗牌到所有新块中。原则上,每个旧块都对每个新块有所贡献。

我们不能直接调用 DataFrame.sort,因为整个数据可能无法放入内存,而且我们大多数的排序算法都假定可以随机访问。

我们分两步进行

  1. 找到好的分割值来对数据进行分区。这些值应将数据分割成大小大致相等的块。
  2. 根据第一步找到的新分区,将旧块洗牌(重排)到新块中。

通过外部排序查找分割值

找到新分区值的一种方法是从每个块中提取新的索引列,执行外存排序,然后从该数组中选取间隔均匀的值。

  1. 从每个块中提取新的索引列

    indexes = [block['new-column-index'] for block in blocks]
    
  2. 对该列执行 外存排序

    sorted_index = fancy_out_of_core_sort(indexes)
    
  3. 选取间隔均匀的值,例如

    partition_values = sorted_index[::1000000]
    

我们使用并行块内排序实现这一点,然后使用 heapq 模块进行流式合并。它有效,但速度较慢。

可能的改进

可以通过以下选项之一来加速此过程

  1. 一个直接作用于 NumPy 数组迭代器的流式数值解法(有人知道 numtoolz 吗?)
  2. 完全不进行排序。我们实际上只需要近似的、间隔均匀的分位数。简短的文献检索表明可能存在一些好的解决方案。

洗牌(重排)

既然我们知道了要基于哪些值进行分区,我们要求每个块将自身分成适当的碎片,并将所有这些碎片放入一个溢出到磁盘的字典中。然后,另一个进程会拾取这些碎片,并调用 pd.concat 将它们合并到新的块中。

对于外存字典,我们目前使用 Chest。事实证明,序列化 DataFrames 并将其写入磁盘可能很棘手。有几种不错的方法,它们之间的性能差异约为一个数量级。

这有效,但我的实现速度较慢

这是一个使用 NYCTaxi 数据片段的示例(这部分数据量很小)

In [1]: import dask.frame as dfr

In [2]: d = dfr.read_csv('/home/mrocklin/data/trip-small.csv', chunksize=10000)

In [3]: d.head(3)   # This is fast
Out[3]:
                          medallion                      hack_license  \
0  89D227B655E5C82AECF13C3F540D4CF4  BA96DE419E711691B9445D6A6307C170
1  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
2  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472

  vendor_id  rate_code store_and_fwd_flag      pickup_datetime  \
0       CMT          1                  N  2013-01-01 15:11:48
1       CMT          1                  N  2013-01-06 00:18:35
2       CMT          1                  N  2013-01-05 18:49:41

      dropoff_datetime  passenger_count  trip_time_in_secs  trip_distance  \
0  2013-01-01 15:18:10                4                382            1.0
1  2013-01-06 00:22:54                1                259            1.5
2  2013-01-05 18:54:23                1                282            1.1

   pickup_longitude  pickup_latitude  dropoff_longitude  dropoff_latitude
0        -73.978165        40.757977         -73.989838         40.751171
1        -74.006683        40.731781         -73.994499         40.750660
2        -74.004707        40.737770         -74.009834         40.726002

In [4]: d2 = d.set_index(d.passenger_count, out_chunksize=10000)   # This takes some time

In [5]: d2.head(3)
Out[5]:
                                        medallion  \
passenger_count
0                3F3AC054811F8B1F095580C50FF16090
1                4C52E48F9E05AA1A8E2F073BB932E9AA
1                FF00E5D4B15B6E896270DDB8E0697BF7

                                     hack_license vendor_id  rate_code  \
passenger_count
0                E00BD74D8ADB81183F9F5295DC619515       VTS          5
1                307D1A2524E526EE08499973A4F832CF       VTS          1
1                0E8CCD187F56B3696422278EBB620EFA       VTS          1

                store_and_fwd_flag      pickup_datetime     dropoff_datetime  \
passenger_count
0                              NaN  2013-01-13 03:25:00  2013-01-13 03:42:00
1                              NaN  2013-01-13 16:12:00  2013-01-13 16:23:00
1                              NaN  2013-01-13 15:05:00  2013-01-13 15:15:00

                 passenger_count  trip_time_in_secs  trip_distance  \
passenger_count
0                              0               1020           5.21
1                              1                660           2.94
1                              1                600           2.18

                 pickup_longitude  pickup_latitude  dropoff_longitude  \
passenger_count
0                      -73.986900        40.743736         -74.029747
1                      -73.976753        40.790123         -73.984802
1                      -73.982719        40.767147         -73.982170

                 dropoff_latitude
passenger_count
0                       40.741348
1                       40.758518
1                       40.746170

In [6]: d2.blockdivs  # our new partition values
Out[6]: (2, 3, 6)

In [7]: d.blockdivs   # our original partition values
Out[7]: (10000, 20000, 30000, 40000, 50000, 60000, 70000, 80000, 90000)

一些问题

  • 首先,我们必须在执行过程中不断评估 dask。每个 set_index 操作(以及因此产生的许多 groupbys 和 joins)都会强制进行评估。我们不能再像在 dask.array 的情况下那样,无休止地组合高级操作以形成越来越复杂的图,然后只在最后进行评估。我们需要边执行边评估。

  • 排序/洗牌(重排)速度慢。这有几个原因,包括 DataFrames 的序列化以及排序本身就很困难。

  • 频繁地对大量数据进行重新索引的可行性如何?我们何时会达到“直接使用数据库”的阶段?

  • Pandas 尚未释放 GIL,所以这完全是单核操作。请参阅关于 PyData 和 GIL 的文章。

  • 我当前的解决方案缺乏基本功能。我跳过了容易的部分,以便首先确保困难的部分是可行的。

求助

我表格方面的知识不如数组多。我对这个领域的文献和常用解决方案并不了解。如果这里的任何内容看起来可疑,那么请务必提出。我非常需要您的帮助。

此外,Pandas API 比 NumPy 的复杂得多。如果有经验的开发者愿意介入并以分块的方式实现一些相当直接的 Pandas 功能,我将不胜感激。


博客评论由 Disqus 提供