这项工作得到了 Continuum Analytics 以及 摩尔基金会 的数据驱动发现倡议的支持。

这是一篇简短的博文,旨在鼓励您在 dataframe 计算中使用 Parquet 代替 CSV。我在这里将使用 Dask.dataframe,但 Pandas 同样适用。我在这里也使用我的本地笔记本电脑,但在集群上使用 Parquet 是一个极好的格式。

CSV 很方便,但很慢

我的笔记本电脑上有纽约出租车数据集,存储为 CSV 格式

mrocklin@carbon:~/data/nyc/csv$ ls
yellow_tripdata_2015-01.csv  yellow_tripdata_2015-07.csv
yellow_tripdata_2015-02.csv  yellow_tripdata_2015-08.csv
yellow_tripdata_2015-03.csv  yellow_tripdata_2015-09.csv
yellow_tripdata_2015-04.csv  yellow_tripdata_2015-10.csv
yellow_tripdata_2015-05.csv  yellow_tripdata_2015-11.csv
yellow_tripdata_2015-06.csv  yellow_tripdata_2015-12.csv

这对人类来说是一种方便的格式,因为我们可以直接阅读它。

mrocklin@carbon:~/data/nyc/csv$ head yellow_tripdata_2015-01.csv
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-01-15 19:05:39,2015-01-15
19:23:42,1,1.59,-73.993896484375,40.750110626220703,1,N,-73.974784851074219,40.750617980957031,1,12,1,0.5,3.25,0,0.3,17.05
1,2015-01-10 20:33:38,2015-01-10
20:53:28,1,3.30,-74.00164794921875,40.7242431640625,1,N,-73.994415283203125,40.759109497070313,1,14.5,0.5,0.5,2,0,0.3,17.8
1,2015-01-10 20:33:38,2015-01-10
20:43:41,1,1.80,-73.963340759277344,40.802787780761719,1,N,-73.951820373535156,40.824413299560547,2,9.5,0.5,0.5,0,0,0.3,10.8
1,2015-01-10 20:33:39,2015-01-10
20:35:31,1,.50,-74.009086608886719,40.713817596435547,1,N,-74.004325866699219,40.719985961914063,2,3.5,0.5,0.5,0,0,0.3,4.8
1,2015-01-10 20:33:39,2015-01-10
20:52:58,1,3.00,-73.971176147460938,40.762428283691406,1,N,-74.004180908203125,40.742652893066406,2,15,0.5,0.5,0,0,0.3,16.3
1,2015-01-10 20:33:39,2015-01-10
20:53:52,1,9.00,-73.874374389648438,40.7740478515625,1,N,-73.986976623535156,40.758193969726563,1,27,0.5,0.5,6.7,5.33,0.3,40.33
1,2015-01-10 20:33:39,2015-01-10
20:58:31,1,2.20,-73.9832763671875,40.726009368896484,1,N,-73.992469787597656,40.7496337890625,2,14,0.5,0.5,0,0,0.3,15.3
1,2015-01-10 20:33:39,2015-01-10
20:42:20,3,.80,-74.002662658691406,40.734142303466797,1,N,-73.995010375976563,40.726325988769531,1,7,0.5,0.5,1.66,0,0.3,9.96
1,2015-01-10 20:33:39,2015-01-10
21:11:35,3,18.20,-73.783042907714844,40.644355773925781,2,N,-73.987594604492187,40.759357452392578,2,52,0,0.5,0,5.33,0.3,58.13

我们可以使用 Pandas 或 Dask.dataframe 等工具读取所有这些数据。由于数据相当大,我将使用 Dask.dataframe

mrocklin@carbon:~/data/nyc/csv$ du -hs .
22G .
In [1]: import dask.dataframe as dd

In [2]: %time df = dd.read_csv('yellow_tripdata_2015-*.csv')
CPU times: user 340 ms, sys: 12 ms, total: 352 ms
Wall time: 377 ms

In [3]: df.head()
Out[3]:
VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2015-01-15 19:05:39   2015-01-15 19:23:42                1
1         1  2015-01-10 20:33:38   2015-01-10 20:53:28                1
2         1  2015-01-10 20:33:38   2015-01-10 20:43:41                1
3         1  2015-01-10 20:33:39   2015-01-10 20:35:31                1
4         1  2015-01-10 20:33:39   2015-01-10 20:52:58                1

   trip_distance  pickup_longitude  pickup_latitude  RateCodeID  \
0           1.59        -73.993896        40.750111           1
1           3.30        -74.001648        40.724243           1
2           1.80        -73.963341        40.802788           1
3           0.50        -74.009087        40.713818           1
4           3.00        -73.971176        40.762428           1

  store_and_fwd_flag  dropoff_longitude  dropoff_latitude  payment_type \
0                  N         -73.974785         40.750618             1
1                  N         -73.994415         40.759109             1
2                  N         -73.951820         40.824413             2
3                  N         -74.004326         40.719986             2
4                  N         -74.004181         40.742653             2

   fare_amount  extra  mta_tax  tip_amount  tolls_amount  \
0         12.0    1.0      0.5        3.25           0.0
1         14.5    0.5      0.5        2.00           0.0
2          9.5    0.5      0.5        0.00           0.0
3          3.5    0.5      0.5        0.00           0.0
4         15.0    0.5      0.5        0.00           0.0

   improvement_surcharge  total_amount
0                    0.3         17.05
1                    0.3         17.80
2                    0.3         10.80
3                    0.3          4.80
4                    0.3         16.30

In [4]: from dask.diagnostics import ProgressBar

In [5]: ProgressBar().register()

In [6]: df.passenger_count.sum().compute()
[########################################] | 100% Completed |
3min 58.8s
Out[6]: 245566747

尽管这些数据太大无法完全加载到内存中,但我们仍然能够就此数据提出问题(并得知 2016 年有 2.5 亿人乘坐出租车)。这是因为 Dask 能够从磁盘惰性地操作。它在需要时读取数据,并在不再需要时将其遗忘。这需要一些时间(4 分钟),但确实有效。

然而,当我们多次从磁盘读取这些数据时,我们开始对这四分钟的成本感到沮丧。在 Pandas 中,我们将数据从磁盘移动到内存时只承受一次这种成本。在没有足够内存的更大数据集上,我们会多次承受这种成本。

Parquet 更快

让我们用 Parquet 尝试相同的过程。我碰巧在我的硬盘上存储了完全相同的数据,格式为 Parquet。

mrocklin@carbon:~/data/nyc$ du -hs nyc-2016.parquet/
17G nyc-2016.parquet/

它存储为一堆独立的文件,但我们实际上并不关心这一点。我们将始终将目录视为数据集。这些文件以二进制格式存储。我们人类无法阅读它们

mrocklin@carbon:~/data/nyc$ head nyc-2016.parquet/part.0.parquet
<a bunch of illegible bytes>

但计算机更能够读取和导航这些数据。让我们做和之前一样的实验

In [1]: import dask.dataframe as dd

In [2]: df = dd.read_parquet('nyc-2016.parquet/')

In [3]: df.head()
Out[3]:
  tpep_pickup_datetime  VendorID tpep_dropoff_datetime  passenger_count  \
0  2015-01-01 00:00:00         2   2015-01-01 00:00:00                3
1  2015-01-01 00:00:00         2   2015-01-01 00:00:00                1
2  2015-01-01 00:00:00         1   2015-01-01 00:11:26                5
3  2015-01-01 00:00:01         1   2015-01-01 00:03:49                1
4  2015-01-01 00:00:03         2   2015-01-01 00:21:48                2

    trip_distance  pickup_longitude  pickup_latitude  RateCodeID  \
0           1.56        -74.001320        40.729057           1
1           1.68        -73.991547        40.750069           1
2           4.00        -73.971436        40.760201           1
3           0.80        -73.860847        40.757294           1
4           2.57        -73.969017        40.754269           1

  store_and_fwd_flag  dropoff_longitude  dropoff_latitude  payment_type  \
0                  N         -74.010208         40.719662             1
1                  N           0.000000          0.000000             2
2                  N         -73.921181         40.768269             2
3                  N         -73.868111         40.752285             2
4                  N         -73.994133         40.761600             2

   fare_amount  extra  mta_tax  tip_amount  tolls_amount  \
0          7.5    0.5      0.5         0.0           0.0
1         10.0    0.0      0.5         0.0           0.0
2         13.5    0.5      0.5         0.0           0.0
3          5.0    0.5      0.5         0.0           0.0
4         14.5    0.5      0.5         0.0           0.0

   improvement_surcharge  total_amount
0                    0.3           8.8
1                    0.3          10.8
2                    0.0          14.5
3                    0.0           6.3
4                    0.3          15.8

In [4]: from dask.diagnostics import ProgressBar

In [5]: ProgressBar().register()

In [6]: df.passenger_count.sum().compute()
[########################################] | 100% Completed |
2.8s
Out[6]: 245566747

值相同,但现在我们的计算在三秒内完成,而不是四分钟。我们在这里稍微有点“作弊”(对于 Parquet 来说,提取乘客数量列特别容易),但一般来说,Parquet 会比 CSV 快 很多。这使我们可以舒适地从磁盘工作,而无需担心我们有多少内存。

转换

所以,帮自己一个忙,转换你的数据

In [1]: import dask.dataframe as dd
In [2]: df = dd.read_csv('csv/yellow_tripdata_2015-*.csv')
In [3]: from dask.diagnostics import ProgressBar
In [4]: ProgressBar().register()
In [5]: df.to_parquet('yellow_tripdata.parquet')
[############                            ] | 30% Completed |  1min 54.7s

如果你想更聪明,可以在转换时指定 dtypes 和压缩。这绝对可以帮助你获得显著的加速,但即使只使用默认设置,仍然会带来很大的改进。

优点

Parquet 支持以下特性

  1. 数据的二进制表示,允许将磁盘上的字节快速转换为内存中的字节
  2. 列式存储,意味着您可以只加载所需的列,而无需加载整个数据集
  3. 行分块存储,以便您可以从特定范围中提取数据,而无需触及其他部分
  4. 每块统计信息,以便您可以快速查找子集
  5. 压缩

Parquet 版本

有两个很棒的支持 Parquet 格式的 Python 包

  1. pyarrow: Apache Arrow 和 Apache Parquet C++ 库的 Python 绑定
  2. fastparquet: Parquet 格式的直接 NumPy + Numba 实现

两者都很好。两者都能做大多数事情。各自有不同的优势。上面的代码默认使用了 fastparquet,但如果需要,您可以在 Dask 中使用 engine='arrow' 关键字进行更改。


博客评论由 Disqus 提供支持