这项工作由 Anaconda Inc. 提供支持。

我很高兴宣布 Dask 0.18.0 版本发布。这是一个重要的版本,包含破坏性更改和新功能。上一个版本 0.17.5 发布于 5 月 4 日。这篇博文概述了自 3 月 21 日发布的 0.17.2 版本博文以来的一些显著变化。

您可以使用 conda 安装 Dask

conda install dask

或从 PyPI 使用 pip 安装

pip install dask[complete] --upgrade

完整的更新日志可在此处获取

下面我们列出了一些破坏性更改,随后是一些不太重要但仍然有趣的变化。

背景

Dask 核心库正接近 1.0 版本发布。在此之前,我们需要做一些清理工作。此版本启动了这个过程,替换了一些现有接口,并构建了一些所需的基础设施。此版本中几乎所有更改都包含了清晰的弃用警告,但未来版本将完全移除旧功能,因此现在是时候检查一下了。

和任何开始进行破坏性更改的版本一样,这次也增加了很多其他小的破坏性更改。我个人对这个版本非常满意,因为现在使用 Dask 的许多方面感觉更加清晰,但 Dask 的重度用户可能会遇到轻微的阻力。希望这篇博文能帮助解释一些较大的变化。

值得注意的破坏性更改

集中式配置

充分利用 Dask 有时需要用户配置,尤其是在分布式环境中。这可能涉及控制日志详细程度、指定集群配置、提供安全凭据,或生产中出现的其他一些选项。

我们发现不同的计算文化喜欢以几种不同的方式指定配置

  1. 配置文件
  2. 环境变量
  3. 直接在 Python 代码中

以前,不同的 dask 子项目使用各种不同的解决方案来处理这个问题。dask-distributed 项目有一个系统,dask-kubernetes 有另一个,等等。

现在,我们将配置集中在 dask.config 模块中,该模块从配置文件、环境变量和运行时代码收集配置,并将其集中提供给所有 Dask 子项目。许多 Dask 子项目 (dask.distributed, dask-kubernetes, 和 dask-jobqueue) 正在同时发布,以利用这一特性。

如果您之前积极使用 Dask.distributed 的配置文件,有些事情已经改变

  1. 配置现在有了命名空间并更深地嵌套。以下是当前 dask.distributed 默认配置文件 的示例

    distributed:
      version: 2
      scheduler:
      allowed-failures: 3 # number of retries before a task is considered bad
      work-stealing: True # workers should steal tasks from each other
      worker-ttl: null # like '60s'. Workers must heartbeat faster than this
    
      worker:
      multiprocessing-method: forkserver
      use-file-locking: True
    
  2. 默认配置位置已从 ~/.dask/config.yaml 移动到 ~/.config/dask/distributed.yaml,该位置将与 kubernetes.yamljobqueue.yaml 等其他几个文件放在一起。

然而,您的旧配置文件仍然会被找到,并且其值会被适当使用。但我们不会尝试将您的旧配置值迁移到新位置。如果您想保持特别整洁,可能需要删除自动生成的 ~/.dask/config.yaml 文件。

您可以在 Dask 配置文档 中了解更多关于 Dask 配置的信息

将常用的 get= 关键字替换为 scheduler=

Dask 可以使用基于线程、进程、单线程执行或分布式集群等多种调度器后端来执行代码。

以前,用户使用一个通用名称的 get= 关键字来选择这些后端

x.compute(get=dask.threaded.get)
x.compute(get=dask.multiprocessing.get)
x.compute(get=dask.local.get_sync)

我们已将其替换为一个更新、有望更清晰的 scheduler= 关键字

x.compute(scheduler='threads')
x.compute(scheduler='processes')
x.compute(scheduler='single-threaded')

The get= 关键字已被弃用并将引发警告。它将在下一个主要版本中完全移除。

更多信息请参阅 选择不同调度器的文档

将 dask.set_options 替换为 dask.config.set

与配置更改相关的是,我们现在在配置中包含了运行时状态。以前人们使用 dask.set_options 上下文管理器设置运行时状态。现在我们推荐使用 dask.config.set

with dask.set_options(scheduler='threads'):  # Before
    ...

with dask.config.set(scheduler='threads'):  # After
    ...

The dask.set_options 函数现在是 dask.config.set 的别名。

移除了 dask.array.learn 子包

这个子包没有宣传,使用率非常低。所有功能(以及更多功能)现在都在 Dask-ML 中可用。

其他

  • 我们从 map_blocks 中移除了 token= 关键字,并将功能移到了 name= 关键字。
  • 当您关闭上下文管理器时,dask.distributed.worker_client 会自动重新加入线程池。
  • Dask.distributed 协议现在将 msgpack 数组解释为元组而不是列表。

有趣的新功能

数组

广义通用函数

Dask.array 现在透明地支持 Numpy 风格的 广义通用函数 (gufuncs)。这意味着您可以将正常的 Numpy GUFuncs(如下例中的 eig)直接应用于 Dask 数组

import dask.array as da
import numpy as np

# Apply a Numpy GUFunc, eig, directly onto a Dask array
x = da.random.normal(size=(10, 10, 10), chunks=(2, 10, 10))
w, v = np.linalg._umath_linalg.eig(x, output_dtypes=(float, float))
# w and v are dask arrays with eig applied along the latter two axes

Numpy 的许多内部函数都有 gufuncs,但它们尚未决定将这些函数切换到公共 API。此外,我们可以与 Numba 等其他项目一起定义 GUFuncs

import numba

@numba.vectorize([float64(float64, float64)])
def f(x, y):
    return x + y

z = f(x, y)  # if x and y are dask arrays, then z will be too

我喜欢这一点是因为 Dask 和 Numba 的开发者在这个功能上完全没有协调,只是它们都支持 Numpy GUFunc 协议,所以您免费获得了这样的交互。

更多信息请参阅 Dask 的 GUFunc 文档。这项工作由 Markus Gonser (@magonser) 完成。

rechunking 的新值“auto”

Dask 数组现在接受一个值“auto”,可以在以前接受 chunk 值的所有地方使用。这会要求 Dask 重新分块这些维度,以达到一个良好的默认块大小。

x = x.rechunk({
    0: x.shape[0], # single chunk in this dimension
  # 1: 100e6 / x.dtype.itemsize / x.shape[0],  # before we had to calculate manually
    1: 'auto'      # Now we allow this dimension to respond to get ideal chunk size
})

# or
x = da.from_array(img, chunks='auto')

这还会检查 array.chunk-size 配置值以获取最佳块大小

>>> dask.config.get('array.chunk-size')
'128MiB'

需要明确的是,这并不支持“自动分块”,后者通常是一个非常困难的问题。用户仍然需要了解他们的计算以及他们希望如何分块,这只是让做出良好决策变得稍微容易一些。

算法改进

Dask.array 新增了完整的 einsum 实现,感谢 Simon Perkins

此外,Dask.array 的 QR 分解在两个方面变得更好了

  1. 它们支持 short-and-fat 数组
  2. The tall-and-skinny 变体现在在更少的内存中更稳定地运行。这是一个友好的执行 GIF 动图

这项工作非常值得赞赏,并由 Jeremy Chan 完成。

得益于 Martin DurantJohn A Kirkham,Dask 原生支持用于分块 n 维数组的 Zarr 格式。Zarr 因其速度快、规范简单、支持完整的 NetCDF 风格约定以及适用于云存储而特别有用。

Dataframes 和 Pandas 0.23

和往常一样,Dask Dataframes 有许多小改进。值得注意的是,它继续兼容刚刚发布的 Pandas 0.23,并新增了一些数据摄入格式。

Dask.dataframe 与最近发布的 Pandas 0.23 中的更改保持一致,感谢 Tom Augspurger

支持 Orc

Dask.dataframe 新增了对 Apache ORC 格式的读取器。

Orc 是一种表格数据存储格式,在 Hadoop 生态系统中很常见。新的 dd.read_orc 函数并行化了 PyArrow 中类似的新 ORC 功能。感谢 Jim Crist 在 Arrow 方面的工作,以及 Martin Durant 使用 Dask 对其进行并行化。

支持 Read_json

Dask.dataframe 现在也新增了对 JSON 文件的读取器。

The dd.read_json 函数与 pandas.read_json API 的大部分功能匹配。

这项功能是在最近的一次 PyCon 2018 演讲比较 Spark 和 Dask dataframe 中,Irina Truong 提到该功能缺失后不久实现的。感谢 Martin DurantIrina Truong 的贡献。

关于 JSON、ORC 或 Dask.dataframe 支持的任何其他格式的更多信息,请参阅 dataframe 数据摄入文档

Joblib

用于 Scikit-Learn 内并行计算的 Joblib 库已经有一段时间拥有 Dask 后端了。虽然它一直都相当容易使用,但现在即使没有太多专业知识也能更好地使用它。在与 Scikit-Learn 开发人员一起实际使用一段时间后,我们发现并解决了一些可用性问题。这些更改只有在下一个 Scikit-Learn 版本发布后(希望很快)才能完全可用,届时我们可能会发布一篇专门讨论该主题的新博文。

此版本与以下软件包同步发布

  1. dask
  2. distributed
  3. dask-kubernetes

还有一个新的用于在 YARN(Hadoop 环境中常见的作业调度器)上部署应用程序的仓库,名为 skein。欢迎早期采用者。

致谢

自 3 月 21 日以来,以下人员为以下仓库做出了贡献

用于并行算法的核心 Dask 仓库

  • Andrethrill
  • Beomi
  • Brendan Martin
  • Christopher Ren
  • Guido Imperiale
  • Diane Trout
  • fjetter
  • Frederick
  • Henry Doupe
  • James Bourbeau
  • Jeremy Chen
  • Jim Crist
  • John A Kirkham
  • Jon Mease
  • Jörg Dietrich
  • Kevin Mader
  • Ksenia Bobrova
  • Larsr
  • Marc Pfister
  • Markus Gonser
  • Martin Durant
  • Matt Lee
  • Matthew Rocklin
  • Pierre-Bartet
  • Scott Sievert
  • Simon Perkins
  • Stefan van der Walt
  • Stephan Hoyer
  • Tom Augspurger
  • Uwe L. Korn
  • 余枫

用于分布式计算的 dask/distributed 仓库

  • Bmaisonn
  • Grant Jenks
  • Henry Doupe
  • Irene Rodriguez
  • Irina Truong
  • John A Kirkham
  • Joseph Atkins-Turkish
  • Kenneth Koski
  • Loïc Estève
  • Marius van Niekerk
  • Martin Durant
  • Matthew Rocklin
  • Olivier Grisel
  • Russ Bubley
  • Tom Augspurger
  • Tony Lorenzo

用于在 Kubernetes 上部署 Dask 的 dask-kubernetes 仓库

  • Brendan Martin
  • J Gerard
  • Matthew Rocklin
  • Olivier Grisel
  • Yuvi Panda

用于在 HPC 作业调度器上部署 Dask 的 dask-jobqueue 仓库

  • Guillaume Eynard-Bontemps
  • jgerardsimcock
  • Joseph Hamman
  • Loïc Estève
  • Matthew Rocklin
  • Ray Bell
  • Rich Signell
  • Shawn Taylor
  • Spencer Clark

用于可扩展机器学习的 dask-ml 仓库

  • Christopher Ren
  • Jeremy Chen
  • Matthew Rocklin
  • Scott Sievert
  • Tom Augspurger

致谢

感谢 Scott Sievert 和 James Bourbeau 帮助编辑本文。


博客评论由 Disqus 提供支持