使用 Dask 进行按组聚合

本文我们将深入探讨 Dask 如何计算 groupby 聚合。这些操作在 ETL 和数据分析中很常用,其过程是将数据拆分成组,独立地对每组应用一个函数,然后将结果合并在一起。在 PyData/R 社区,这通常被称为 split-apply-combine(拆分-应用-合并)策略(由 Hadley Wickham 最早提出),并在 Pandas 生态系统中广泛使用。

图片来源:swcarpentry.github.io

Dask 借鉴了这一思想,并使用了类似 catchy(朗朗上口)的名字:apply-concat-apply(应用-连接-应用),简称 aca。在本文中,我们将探讨 aca 策略在简单和复杂操作中的应用。

首先,回顾一下 Dask DataFrame 是 DataFrame 对象的集合(例如,Dask DataFrame 的每个分区都是一个 Pandas DataFrame)。例如,假设我们有以下 Pandas DataFrame:

>>> import pandas as pd
>>> df = pd.DataFrame(dict(a=[1, 1, 2, 3, 3, 1, 1, 2, 3, 3, 99, 10, 1],
...                        b=[1, 3, 10, 3, 2, 1, 3, 10, 3, 3, 12, 0, 9],
...                        c=[2, 4, 5, 2, 3, 5, 2, 3, 9, 2, 44, 33, 2]))
>>> df
     a   b   c
0    1   1   2
1    1   3   4
2    2  10   5
3    3   3   2
4    3   2   3
5    1   1   5
6    1   3   2
7    2  10   3
8    3   3   9
9    3   3   2
10  99  12  44
11  10   0  33
12   1   9   2

要从这些数据创建一个包含三个分区的 Dask DataFrame,我们可以按索引范围 (0, 4)、(5, 9) 和 (10, 12) 对 df 进行分区。我们可以使用 Dask 的 from_pandas 函数并设置 npartitions=3 来完成此分区。

>>> import dask.dataframe as dd
>>> ddf = dd.from_pandas(df, npartitions=3)

这 3 个分区就是 3 个独立的 Pandas DataFrame

>>> ddf.partitions[0].compute()
   a   b  c
0  1   1  2
1  1   3  4
2  2  10  5
3  3   3  2
4  3   2  3

应用-连接-应用

当 Dask 对 Dask DataFrame 应用函数和/或算法(例如 summean 等)时,它会独立地将该操作应用于所有组成它的分区,将输出收集(或连接)成中间结果,然后再次将操作应用于中间结果以产生最终结果。在内部,Dask 对许多内部 DataFrame 计算都重用了相同的 apply-concat-apply 方法。

让我们通过 aca 过程的每个步骤来分解 Dask 如何计算 ddf.groupby(['a', 'b']).c.sum()。我们将首先把 df 这个 Pandas DataFrame 拆分成三个分区。

>>> df_1 = df[:5]
>>> df_2 = df[5:10]
>>> df_3 = df[-3:]

应用

接下来,我们对三个分区分别执行相同的 groupby(['a', 'b']).c.sum() 操作。

>>> sr1 = df_1.groupby(['a', 'b']).c.sum()
>>> sr2 = df_2.groupby(['a', 'b']).c.sum()
>>> sr3 = df_3.groupby(['a', 'b']).c.sum()

这些操作都会产生一个带有多级索引(MultiIndex)的 Series。

>>> sr1
a  b
1  1     2
   3     4
2  10    5
3  2     3
   3     2
Name: c, dtype: int64
      
>>> sr2
a  b
1  1      5
   3      2
2  10     3
3  3     11
Name: c, dtype: int64
      
>>> sr3
a   b
1   9      2
10  0     33
99  12    44
Name: c, dtype: int64
      

连接!

在第一次 apply 之后,下一步是将中间结果 sr1sr2sr3 连接起来。使用 Pandas 的 concat 函数可以很直接地完成这一步。

>>> sr_concat = pd.concat([sr1, sr2, sr3])
>>> sr_concat
a   b
1   1      2
    3      4
2   10     5
3   2      3
    3      2
1   1      5
    3      2
2   10     3
3   3     11
1   9      2
10  0     33
99  12    44
Name: c, dtype: int64

再次应用

我们的最后一步是再次对连接后的 sr_concat Series 应用相同的 groupby(['a', 'b']).c.sum() 操作。但我们不再拥有列 ab,那我们该如何继续呢?

退一步来看,我们的目标是将具有相同索引的列中的值相加。例如,有两个行的索引是 (1, 1),对应的值分别是 2 和 5。那么,我们如何对具有相同值的索引进行 groupby 呢?MultiIndex 使用层级(levels)来定义给定索引处的值。Dask 在 apply-concat-apply 计算的最终应用步骤中确定使用这些层级。在我们的例子中,层级是 [0, 1],也就是说,我们想要第 0 层和第 1 层的索引,如果我们按这两层进行分组(0, 1),我们就有效地将相同的索引组合在一起了。

>>> total = sr_concat.groupby(level=[0, 1]).sum()
>>> total
a   b
1   1      7
    3      6
    9      2
2   10     8
3   2      3
    3     13
10  0     33
99  12    44
Name: c, dtype: int64
      
>>> ddf.groupby(['a', 'b']).c.sum().compute()
a   b
1   1      7
    3      6
2   10     8
3   2      3
    3     13
1   9      2
10  0     33
99  12    44
Name: c, dtype: int64
      
>>> df.groupby(['a', 'b']).c.sum()
a   b
1   1      7
    3      6
    9      2
2   10     8
3   2      3
    3     13
10  0     33
99  12    44
Name: c, dtype: int64
      

此外,我们可以很容易地检查这个 apply-concat-apply 计算的步骤,通过可视化计算的任务图

>>> ddf.groupby(['a', 'b']).c.sum().visualize()

sum 是一个相当直接的计算。对于像 mean 这样稍微复杂一些的操作怎么办?

>>> ddf.groupby(['a', 'b']).c.mean().visualize()

Mean(均值)是无法直接套用 aca 模型的一个好例子——将 mean 值连接起来并再次计算 mean 会得到错误的结果。就像任何计算模式一样:向量化、Map/Reduce 等,有时我们需要创造性地调整计算方式以适应模式。在 aca 的情况下,我们通常可以将计算分解为组成部分。对于 mean,这将是 sum(求和)和 count(计数)。

\[\bar{x} = \frac{x_1+x_2+\cdots +x_n}{n}\]

从上面的任务图中可以看出,每个分区都有两个独立任务:series-groupby-count-chunkseries-groupby-sum-chunk。然后结果被聚合成两个最终节点:series-groupby-count-aggseries-groupby-sum-agg,最后我们计算均值:total sum / total count


博客评论由 Disqus 提供支持