使用 Apache Parquet
这项工作得到了 Continuum Analytics 以及 摩尔基金会 的数据驱动发现倡议的支持。
这是一篇简短的博文,旨在鼓励您在 dataframe 计算中使用 Parquet 代替 CSV。我在这里将使用 Dask.dataframe,但 Pandas 同样适用。我在这里也使用我的本地笔记本电脑,但在集群上使用 Parquet 是一个极好的格式。
CSV 很方便,但很慢
我的笔记本电脑上有纽约出租车数据集,存储为 CSV 格式
mrocklin@carbon:~/data/nyc/csv$ ls
yellow_tripdata_2015-01.csv yellow_tripdata_2015-07.csv
yellow_tripdata_2015-02.csv yellow_tripdata_2015-08.csv
yellow_tripdata_2015-03.csv yellow_tripdata_2015-09.csv
yellow_tripdata_2015-04.csv yellow_tripdata_2015-10.csv
yellow_tripdata_2015-05.csv yellow_tripdata_2015-11.csv
yellow_tripdata_2015-06.csv yellow_tripdata_2015-12.csv
这对人类来说是一种方便的格式,因为我们可以直接阅读它。
mrocklin@carbon:~/data/nyc/csv$ head yellow_tripdata_2015-01.csv
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-01-15 19:05:39,2015-01-15
19:23:42,1,1.59,-73.993896484375,40.750110626220703,1,N,-73.974784851074219,40.750617980957031,1,12,1,0.5,3.25,0,0.3,17.05
1,2015-01-10 20:33:38,2015-01-10
20:53:28,1,3.30,-74.00164794921875,40.7242431640625,1,N,-73.994415283203125,40.759109497070313,1,14.5,0.5,0.5,2,0,0.3,17.8
1,2015-01-10 20:33:38,2015-01-10
20:43:41,1,1.80,-73.963340759277344,40.802787780761719,1,N,-73.951820373535156,40.824413299560547,2,9.5,0.5,0.5,0,0,0.3,10.8
1,2015-01-10 20:33:39,2015-01-10
20:35:31,1,.50,-74.009086608886719,40.713817596435547,1,N,-74.004325866699219,40.719985961914063,2,3.5,0.5,0.5,0,0,0.3,4.8
1,2015-01-10 20:33:39,2015-01-10
20:52:58,1,3.00,-73.971176147460938,40.762428283691406,1,N,-74.004180908203125,40.742652893066406,2,15,0.5,0.5,0,0,0.3,16.3
1,2015-01-10 20:33:39,2015-01-10
20:53:52,1,9.00,-73.874374389648438,40.7740478515625,1,N,-73.986976623535156,40.758193969726563,1,27,0.5,0.5,6.7,5.33,0.3,40.33
1,2015-01-10 20:33:39,2015-01-10
20:58:31,1,2.20,-73.9832763671875,40.726009368896484,1,N,-73.992469787597656,40.7496337890625,2,14,0.5,0.5,0,0,0.3,15.3
1,2015-01-10 20:33:39,2015-01-10
20:42:20,3,.80,-74.002662658691406,40.734142303466797,1,N,-73.995010375976563,40.726325988769531,1,7,0.5,0.5,1.66,0,0.3,9.96
1,2015-01-10 20:33:39,2015-01-10
21:11:35,3,18.20,-73.783042907714844,40.644355773925781,2,N,-73.987594604492187,40.759357452392578,2,52,0,0.5,0,5.33,0.3,58.13
我们可以使用 Pandas 或 Dask.dataframe 等工具读取所有这些数据。由于数据相当大,我将使用 Dask.dataframe
mrocklin@carbon:~/data/nyc/csv$ du -hs .
22G .
In [1]: import dask.dataframe as dd
In [2]: %time df = dd.read_csv('yellow_tripdata_2015-*.csv')
CPU times: user 340 ms, sys: 12 ms, total: 352 ms
Wall time: 377 ms
In [3]: df.head()
Out[3]:
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \
0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1
1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1
2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1
3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1
4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1
trip_distance pickup_longitude pickup_latitude RateCodeID \
0 1.59 -73.993896 40.750111 1
1 3.30 -74.001648 40.724243 1
2 1.80 -73.963341 40.802788 1
3 0.50 -74.009087 40.713818 1
4 3.00 -73.971176 40.762428 1
store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type \
0 N -73.974785 40.750618 1
1 N -73.994415 40.759109 1
2 N -73.951820 40.824413 2
3 N -74.004326 40.719986 2
4 N -74.004181 40.742653 2
fare_amount extra mta_tax tip_amount tolls_amount \
0 12.0 1.0 0.5 3.25 0.0
1 14.5 0.5 0.5 2.00 0.0
2 9.5 0.5 0.5 0.00 0.0
3 3.5 0.5 0.5 0.00 0.0
4 15.0 0.5 0.5 0.00 0.0
improvement_surcharge total_amount
0 0.3 17.05
1 0.3 17.80
2 0.3 10.80
3 0.3 4.80
4 0.3 16.30
In [4]: from dask.diagnostics import ProgressBar
In [5]: ProgressBar().register()
In [6]: df.passenger_count.sum().compute()
[########################################] | 100% Completed |
3min 58.8s
Out[6]: 245566747
尽管这些数据太大无法完全加载到内存中,但我们仍然能够就此数据提出问题(并得知 2016 年有 2.5 亿人乘坐出租车)。这是因为 Dask 能够从磁盘惰性地操作。它在需要时读取数据,并在不再需要时将其遗忘。这需要一些时间(4 分钟),但确实有效。
然而,当我们多次从磁盘读取这些数据时,我们开始对这四分钟的成本感到沮丧。在 Pandas 中,我们将数据从磁盘移动到内存时只承受一次这种成本。在没有足够内存的更大数据集上,我们会多次承受这种成本。
Parquet 更快
让我们用 Parquet 尝试相同的过程。我碰巧在我的硬盘上存储了完全相同的数据,格式为 Parquet。
mrocklin@carbon:~/data/nyc$ du -hs nyc-2016.parquet/
17G nyc-2016.parquet/
它存储为一堆独立的文件,但我们实际上并不关心这一点。我们将始终将目录视为数据集。这些文件以二进制格式存储。我们人类无法阅读它们
mrocklin@carbon:~/data/nyc$ head nyc-2016.parquet/part.0.parquet
<a bunch of illegible bytes>
但计算机更能够读取和导航这些数据。让我们做和之前一样的实验
In [1]: import dask.dataframe as dd
In [2]: df = dd.read_parquet('nyc-2016.parquet/')
In [3]: df.head()
Out[3]:
tpep_pickup_datetime VendorID tpep_dropoff_datetime passenger_count \
0 2015-01-01 00:00:00 2 2015-01-01 00:00:00 3
1 2015-01-01 00:00:00 2 2015-01-01 00:00:00 1
2 2015-01-01 00:00:00 1 2015-01-01 00:11:26 5
3 2015-01-01 00:00:01 1 2015-01-01 00:03:49 1
4 2015-01-01 00:00:03 2 2015-01-01 00:21:48 2
trip_distance pickup_longitude pickup_latitude RateCodeID \
0 1.56 -74.001320 40.729057 1
1 1.68 -73.991547 40.750069 1
2 4.00 -73.971436 40.760201 1
3 0.80 -73.860847 40.757294 1
4 2.57 -73.969017 40.754269 1
store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type \
0 N -74.010208 40.719662 1
1 N 0.000000 0.000000 2
2 N -73.921181 40.768269 2
3 N -73.868111 40.752285 2
4 N -73.994133 40.761600 2
fare_amount extra mta_tax tip_amount tolls_amount \
0 7.5 0.5 0.5 0.0 0.0
1 10.0 0.0 0.5 0.0 0.0
2 13.5 0.5 0.5 0.0 0.0
3 5.0 0.5 0.5 0.0 0.0
4 14.5 0.5 0.5 0.0 0.0
improvement_surcharge total_amount
0 0.3 8.8
1 0.3 10.8
2 0.0 14.5
3 0.0 6.3
4 0.3 15.8
In [4]: from dask.diagnostics import ProgressBar
In [5]: ProgressBar().register()
In [6]: df.passenger_count.sum().compute()
[########################################] | 100% Completed |
2.8s
Out[6]: 245566747
值相同,但现在我们的计算在三秒内完成,而不是四分钟。我们在这里稍微有点“作弊”(对于 Parquet 来说,提取乘客数量列特别容易),但一般来说,Parquet 会比 CSV 快 很多。这使我们可以舒适地从磁盘工作,而无需担心我们有多少内存。
转换
所以,帮自己一个忙,转换你的数据
In [1]: import dask.dataframe as dd
In [2]: df = dd.read_csv('csv/yellow_tripdata_2015-*.csv')
In [3]: from dask.diagnostics import ProgressBar
In [4]: ProgressBar().register()
In [5]: df.to_parquet('yellow_tripdata.parquet')
[############ ] | 30% Completed | 1min 54.7s
如果你想更聪明,可以在转换时指定 dtypes 和压缩。这绝对可以帮助你获得显著的加速,但即使只使用默认设置,仍然会带来很大的改进。
优点
Parquet 支持以下特性
- 数据的二进制表示,允许将磁盘上的字节快速转换为内存中的字节
- 列式存储,意味着您可以只加载所需的列,而无需加载整个数据集
- 行分块存储,以便您可以从特定范围中提取数据,而无需触及其他部分
- 每块统计信息,以便您可以快速查找子集
- 压缩
Parquet 版本
有两个很棒的支持 Parquet 格式的 Python 包
- pyarrow: Apache Arrow 和 Apache Parquet C++ 库的 Python 绑定
- fastparquet: Parquet 格式的直接 NumPy + Numba 实现
两者都很好。两者都能做大多数事情。各自有不同的优势。上面的代码默认使用了 fastparquet
,但如果需要,您可以在 Dask 中使用 engine='arrow'
关键字进行更改。
博客评论由 Disqus 提供支持