这项工作得到了 Anaconda Inc. 以及来自 摩尔基金会 (Moore Foundation) 的数据驱动发现计划 (Data Driven Discovery Initiative) 的支持。

我很高兴地宣布 Dask 版本 0.16.0 发布了。这是一个主要版本,包含了新特性、破坏性变更和稳定性改进。这篇博文概述了自 9 月 24 日发布 0.15.3 版本以来的重要变化。

你可以使用 conda 安装 Dask

conda install dask

或使用 pip 从 PyPI 安装

pip install dask[complete] --upgrade

Conda 包在 conda-forge 和默认通道上均可获得。

完整的变更日志可在此处查看

下面是一些值得注意的变化。

破坏性变更

  • 为了兼容 Python 3.7,dask.async 模块已移至 dask.local。该模块此前已被弃用,现在已完全移除。
  • 分布式调度器的诊断 JSON 页面已被移除,并替换为信息更丰富的模板化 HTML。
  • 常用的私有方法 _keys_optimize 的使用已被 Dask collection 接口取代(见下文)。

Dask Collection 接口

现在使用 Dask Collection 接口实现自定义集合更加容易。

Dask 集合(arrays、dataframes、bags、delayed)通过一些内部方法与 Dask 调度器(单机、分布式)交互。我们已将此接口正式化为诸如 .__dask_graph__().__dask_keys__() 之类的协议,并已发布了该接口。任何实现该文档中描述的方法的对象都将作为一流的 Dask 对象与所有 Dask 调度器功能交互。

class MyDaskCollection(object):
    def __dask_graph__(self):
        ...

    def __dask_keys__(self):
        ...

    def __dask_optimize__(self, ...):
        ...

    ...

此接口已在 XArray 项目中为标记和索引数组实现。现在所有 XArray 类(DataSet、DataArray、Variable)都能被所有 Dask 调度器完全理解。它们与 dask.arrays 或 dask.dataframes 一样是一流的对象。

import xarray as xa
from dask.distributed import Client

client = Client()

ds = xa.open_mfdataset('*.nc', ...)

ds = client.persist(ds)  # XArray object integrate seamlessly with Dask schedulers

Dask collection 接口的工作主要由 Jim Crist 完成。

带宽与 Tornado 5 兼容性

Dask 构建在用于并发网络编程的 Tornado 库之上。为了改善在特殊硬件(Infiniband)上的工作节点间带宽,Dask 开发者正在提议修改 Tornado 的网络基础设施。

然而,为了使用这些更改,Dask 本身需要运行在正在开发的 Tornado 的下一个版本 Tornado 5.0.0 上,这破坏了 Dask 所依赖的许多接口。Dask 开发者一直在解决这些问题,我们也鼓励其他 PyData 开发者这样做。例如,Bokeh 和 Jupyter 都无法在 Tornado 5.0.0-dev 上运行。

Dask 的工作节点间带宽在理论上可达 3GB/s 的网络上峰值约为 1.5-2GB/s。 GitHub issue: pangeo #6

Dask worker bandwidth

网络性能和 Tornado 兼容性主要由 Antoine Pitrou 负责处理。

Parquet 兼容性

Dask.dataframe 可以使用 Python 中两种常见的 Parquet 库中的任一种:Apache Arrow 和 Fastparquet。每个库都有自己的优势和偏好它的用户群。我们显著扩展了 Dask 的 Parquet 测试套件,以覆盖每个库,扩展了往返兼容性。值得注意的是,现在您可以使用 PyArrow 进行读写。

df.to_parquet('...', engine='fastparquet')
df = dd.read_parquet('...', engine='pyarrow')

这里仍有一些工作要做。市面上各种各样的 Parquet 读写器和规范使得完全解决这个问题变得困难。很高兴看到各个项目正慢慢趋向于共同的功能。

这项工作由 Uwe Korn、Jim Crist 和 Martin Durant 共同完成。

重试任务

Dask.distributed 调度器最受期待的功能之一是重试失败任务的能力。这对于将 Dask 用作任务队列而不是大型 dataframe 或 array 的用户来说尤其有用。

future = client.submit(func, *args, retries=5)

任务重试功能主要由 Antoine Pitrou 构建。

事务性工作窃取

Dask.distributed 任务调度器通过工作窃取来执行负载均衡。以前,这有时会导致同一个任务在两个位置同时运行。现在窃取是事务性的,这意味着它将避免意外地两次运行同一个任务。对于使用 Dask 任务产生副作用的人来说,这种行为尤其重要。

同一个任务仍然有可能运行两次,但现在这种情况只发生在更极端的情况下,例如工作节点死亡或 TCP 连接断开,这两种情况在标准硬件上都不常见。

事务性工作窃取主要由 Matthew Rocklin 实现。

新诊断页面

仪表板的 Info 选项卡中有一组新的诊断网页。这些页面提供了关于每个工作节点和任务更深入的信息,但没有任何动态内容。它们使用 Tornado 模板而不是 Bokeh 图形,这意味着它们的响应性较低,但构建起来更容易。这是一种简单且廉价的方式来暴露更多的调度器状态。

Task page of Dask's scheduler info dashboard

嵌套计算调用

现在在任务 内部 调用 .compute() 会调用相同的分布式调度器。这使得编写更复杂的工作负载时,无需过多考虑启动工作节点客户端。

import dask
from dask.distributed import Client
client = Client()  # only works for the newer scheduler

@dask.delayed
def f(x):
    ...
    return dask.compute(...)  # can call dask.compute within delayed task

dask.compute([f(i) for ...])

嵌套计算调用主要由 Matthew Rocklin 和 Olivier Grisel 开发。

更积极的垃圾回收

现在,工作节点在内存压力大和释放数据时,会在不同时间显式调用 gc.collect()。这有助于避免一些内存泄漏,尤其是在使用 Pandas dataframes 时。事实证明,谨慎地执行此操作需要惊人的细节程度。

改进的垃圾回收主要由 Fabian Keller 和 Olivier Grisel 实现和测试,并采纳了 Antoine Pitrou 的建议。

Dask-ML

各种 Dask 机器学习项目现在正被整合到一个统一的仓库中,即 dask-ml。我们鼓励用户和研究人员阅读该项目。我们相信其中包含许多有用且有趣的方法。

组织和管理这些算法的工作主要由 Tom Augspurger 负责。

XArray

用于索引和标记数组的 XArray 项目本周也发布了其主要版本 0.10.0,其中包括许多性能改进,特别是对于在大型数据集上使用 Dask。

致谢

自 9 月 24 日发布 0.15.3 版本以来,以下人员对 dask/dask 仓库做出了贡献

  • Ced4
  • Christopher Prohm
  • fjetter
  • Hai Nguyen Mau
  • Ian Hopkinson
  • James Bourbeau
  • James Munroe
  • Jesse Vogt
  • Jim Crist
  • John Kirkham
  • Keisuke Fujii
  • Matthias Bussonnier
  • Matthew Rocklin
  • mayl
  • Martin Durant
  • Olivier Grisel
  • severo
  • Simon Perkins
  • Stephan Hoyer
  • Thomas A Caswell
  • Tom Augspurger
  • Uwe L. Korn
  • 季伟
  • xwang777

自 9 月 24 日发布 1.19.1 版本以来,以下人员对 dask/distributed 仓库做出了贡献

  • Alvaro Ulloa
  • Antoine Pitrou
  • chkoar
  • Fabian Keller
  • Ian Hopkinson
  • Jim Crist
  • Kelvin Yang
  • Krisztián Szűcs
  • Matthew Rocklin
  • Mike DePalatis
  • Olivier Grisel
  • rbubley
  • Tom Augspurger

以下人员对 dask/dask-ml 仓库做出了贡献

  • Evan Welch
  • Matthew Rocklin
  • severo
  • Tom Augspurger
  • Trey Causey

此外,我们很荣幸地宣布 Olivier Grisel 已接受 Dask 项目的提交权限。Olivier 在分布式调度器以及 Joblib、SKLearn 和 Cloudpickle 等相关项目上一直非常活跃。


博客评论由 Disqus 提供支持