Dask 开发日志
这项工作得到了 Continuum Analytics 以及来自 Moore Foundation 的 Data Driven Discovery Initiative 的支持。
为了提高透明度,我每周(大概)都会撰写博客,介绍过去一周在 Dask 及相关项目上完成的工作。本日志涵盖 2017-04-20 至 2017-04-28 期间完成的工作。此处提及的内容均尚未准备好用于生产环境。这篇博文是仓促写成的,因此请勿期望其文字经过精心打磨。
上周在 Dask 及相关项目中的开发包括以下值得关注的变化:
- 改进了 Joblib 支持,加速了现有 Scikit-Learn 代码
- 基于 dask-glm 的 LogisticRegression 估计器,与 scikit-learn 兼容
- 通过 Arrow 增加了对 Parquet 的支持
- 稀疏数组
- 改进了溢出到磁盘的行为
- 兼容 AsyncIO 的客户端
- TLS (SSL) 支持
- NumPy
__array_ufunc__
协议
Joblib
Scikit learn 使用 Joblib 对其大部分算法进行并行化处理,Joblib 为简单并行计算提供了简单的接口。Dask 已经能够 “劫持” Joblib 代码并充当其后端一段时间了,但它有一些限制,特别是我们必须为每一批计算反复将数据从工作节点发送回客户端。
import distributed.joblib
from joblib import Parallel, parallel_backend
with parallel_backend('dask.distributed', scheduler_host='HOST:PORT'):
# normal Joblib code
现在新增了一个 scatter=
关键字,允许你预先将选定的变量分散到所有 Dask 工作节点上。这显著降低了开销,尤其是在机器学习工作负载中,因为大部分数据变化不大。
# Send the training data only once to each worker
with parallel_backend('dask.distributed', scheduler_host='localhost:8786',
scatter=[digits.data, digits.target]):
search.fit(digits.data, digits.target)
早期试验表明,诸如 scikit-learn 的 RandomForest 之类的计算可以在集群上很好地扩展,而无需任何额外代码。
这尤其好,因为它使得 Dask 和 Scikit-Learn 能够很好地协同工作,而完全无需在 Scikit-Learn 代码库中引入 Dask。从维护的角度来看,这种组合非常有吸引力。
Jim Crist 在 dask/distributed #1022 中完成的工作。
Dask-GLM Logistic Regression
dask-glm 项目中的凸优化求解器允许我们并行且大规模地解决常见的机器学习和统计问题。历史上,这个年轻的库只包含优化求解器,用户 API 相对较少。
本周,dask-glm 新增了 LogisticRegression 和 LinearRegression 估计器,它们通过 Scikit-Learn 兼容的接口暴露了 dask-glm 中可伸缩的凸优化算法。这既可以加速单台计算机上的求解速度,也可以为之前因数据集过大而无法放入内存的情况提供解决方案。
from dask_glm.estimators import LogisticRegression
est = LogisticRegression()
est.fit(my_dask_array, labels)
这个 notebook 比较了其在单机上处理 5,000,000 个数据集时与最新版 scikit-learn 的性能。Dask-glm 的速度是 scikit-learn 的四倍,这大致相当于开发机器上的核心数量。然而,Olivier Grisel 在 这个 notebook 中展示了 scikit-learn 的开发版本(采用了新算法)在速度上超过 dask-glm 六倍。这再次表明,更聪明地使用算法几乎总是比采用并行计算更有效地利用时间。
Tom Augspurger 和 Chris White 在 dask/dask-glm #40 中完成的工作。
使用 Arrow 处理 Parquet
Parquet 格式正迅速成为并行和分布式数据帧的标准。目前有两个可通过 Python 访问的 Parquet 读写器:fastparquet,一个基于 NumPy/Numba 的解决方案;以及 Parquet-CPP,一个 C++ 解决方案,由 Arrow 提供包装器。Dask.dataframe 使用 fastparquet 支持 Parquet 已有一段时间了。
然而,用户现在可以选择使用 Arrow,只需在 dd.read_parquet
函数中切换 engine=
关键字即可。
df = dd.read_parquet('/path/to/mydata.parquet', engine='fastparquet')
df = dd.read_parquet('/path/to/mydata.parquet', engine='arrow')
希望这项功能能增加这两个项目的使用,并为这些库带来更多反馈,以便它们能够继续推动 Python 对 Parquet 格式的访问。温馨提示,通过从 CSV 切换到 Parquet,通常可以获得快得多的查询时间。这往往比并行计算更有效。
Wes McKinney 在 dask/dask #2223 中完成的工作。
稀疏数组
这里有一个小的多维稀疏数组库:https://github.com/mrocklin/sparse。当大多数元素为零时,它允许我们在内存中紧凑地表示数组。这与 scipy.sparse 中的标准解决方案不同,后者只能支持二维数组(矩阵)而不能支持更高维度的数组。
pip install sparse
>>> import numpy as np
>>> x = np.random.random(size=(10, 10, 10, 10))
>>> x[x < 0.9] = 0
>>> x.nbytes
80000
>>> import sparse
>>> s = sparse.COO(x)
>>> s
<COO: shape=(10, 10, 10, 10), dtype=float64, nnz=1074>
>>> s.nbytes
12888
>>> sparse.tensordot(s, s, axes=((1, 0, 3), (2, 1, 0))).sum(axis=1)
array([ 100.93868073, 128.72312323, 119.12997217, 118.56304153,
133.24522101, 98.33555365, 90.25304866, 98.99823973,
100.57555847, 78.27915528])
此外,这个 sparse
库更忠实地遵循 numpy.ndarray
API,这正是 dask.array
所期望的。由于 API 的这种紧密匹配,dask.array 可以像并行处理密集 numpy 数组一样轻松地围绕稀疏数组进行并行化处理。这使得我们能够以相对较低的成本获得一个不错的分布式多维稀疏数组库。
>>> import dask.array as da
>>> x = da.random.random(size=(10000, 10000, 10000, 10000),
... chunks=(100, 100, 100, 100))
>>> x[x < 0.9] = 0
>>> s = x.map_blocks(sparse.COO) # parallel array of sparse arrays
sparse
库的工作目前由我 (myself) 和 Jake VanderPlas 完成,可在此处 (here) 获取。将其与 Dask.array 连接的工作在 dask/dask #2234 中。
更好的溢出到磁盘行为
我一直在笔记本电脑上使用 1TB Criteo 数据集的 50GB 样本(这是我使用稀疏数组的地方)。为了让计算流程稍微快一些,我改进了 Dask 溢出到磁盘策略的性能。
现在,我们不再依赖 (cloud)pickle,而是使用 Dask 的网络协议,该协议更有效地处理数据,能良好压缩,并对常见的和重要类型(如 NumPy 数组以及由 NumPy 数组构建的东西,如稀疏数组)进行了特殊处理。
因此,读写过量数据到磁盘的速度显著加快。在执行机器学习计算(它们相当耗时)时,磁盘访问现在足够快,以至于我在实践中几乎察觉不到,内存不足也不会显著影响性能。
这主要在您使用常用类型(如 numpy 数组)且计算与磁盘访问比率相对较高(例如分析工作负载就是这种情况)时才真正相关,但这只是一个简单的修复,却极大地提升了我个人的生产力。
我本人在 dask/distributed #946 中完成的工作。
兼容 AsyncIO 的客户端
Dask.distributed 调度器维护一个完全异步的 API,用于与 Tornado 或 AsyncIO 等非阻塞系统配合使用。由于 Dask 支持 Python 2,我们所有的内部代码都使用 Tornado 编写。虽然 Tornado 和 AsyncIO 可以协同工作,但这通常需要一些额外的记账工作,比如将 Tornado future 转换为 AsyncIO future 等。
现在有一个 AsyncIO 专用客户端,它只包含 AsyncIO 原生的非阻塞方法。这使得在 Python 3 中编写更符合惯例的异步代码成为可能。
async with AioClient('scheduler-address:8786') as c:
future = c.submit(func, *args, **kwargs)
result = await future
Krisztián Szűcs 在 dask/distributed #1029 中完成的工作。
TLS (SSL) 支持
TLS(以前称为 SSL)是一种常见且受信任的认证和加密解决方案。对于网络内部安全很重要的公司或机构来说,这是一项经常被要求的功能。这项工作目前正在 dask/distributed #1034 中进行。我鼓励任何可能受此影响的人参与该拉取请求的讨论。
Antoine Pitrou 在 dask/distributed #1034 中完成的工作,以及之前由 Marius van Niekerk 在 dask/distributed #866 中完成的工作。
NumPy __array_ufunc__
NumPy 中的这项最新更改(正当我撰写这篇博文时刚刚合并)允许其他数组库控制现有的 NumPy ufuncs。因此,如果您调用诸如 np.exp(my_dask_array)
之类的函数,它将不再转换为 NumPy 数组,而是调用相应的 dask.array.exp
函数。这是朝着编写通用数组代码迈出的重要一步,这些代码既适用于 NumPy 数组,也适用于其他数组项目,如 dask.array、xarray、bcolz、sparse 等。
与 NumPy 中的所有重大更改一样,这项工作是通过许多人的协作完成的。numpy/numpy #8247 中的 PR。
博客评论由 Disqus 提供支持