这项工作由 Continuum AnalyticsXDATA ProgramMoore Foundation 的数据驱动发现计划提供支持

摘要

Dask 刚刚发布了 0.14.0 版本。此版本包含一些重要的内部更改,以及通常的 API 覆盖范围增加和错误修复。本博文概述了自上次发布(2017 年 1 月 27 日)以来的一些主要变化。

  1. 集合间图的结构共享
  2. 重构通信系统
  3. 多项小的 DataFrame 改进
  4. 顶层持久化函数

您可以使用 Conda 或 Pip 安装新版本

conda install -c conda-forge dask distributed

pip install dask[complete] distributed --upgrade

集合间共享图

Dask 集合(数组、bag、DataFrame、延迟计算)持有包含生成所需结果所需所有任务的任务图。对于大型数据集或复杂计算,这些图可能包含数千,有时甚至数百万个任务。在某些情况下,处理这些图的开销可能会变得相当大。

这一点尤其重要,因为 Dask 集合不会就地修改它们的图,而是创建带有更新计算的新图。复制包含数百万个节点的数据图结构可能需要几秒钟,并中断交互式工作流程。

为了解决这个问题,dask.arrays 和 dask.delayed 集合现在使用具有结构共享的特殊图数据结构。这显著减少了构建重复计算时的开销。

import dask.array as da

x = da.ones(1000000, chunks=(1000,))  # 1000 chunks of size 1000

0.13.0 版本

%time for i in range(100): x = x + 1
CPU times: user 2.69 s, sys: 96 ms, total: 2.78 s
Wall time: 2.78 s

0.14.0 版本

%time for i in range(100): x = x + 1
CPU times: user 756 ms, sys: 8 ms, total: 764 ms
Wall time: 763 ms

这个玩具问题中的差异是适度的,但对于实际应用场景,这种差异可能会变得相当大。这也是气候科学界发现的阻碍他们处理 PB 级分析的障碍之一。

我们选择首先在数组和延迟计算中推出此功能,仅仅因为这是通常会产生大型任务图的两种集合。DataFrame 和 bag 暂时保持不变。

通信系统

Dask 通过 TCP socket 进行通信。它使用 Tornado 的 IOStreams 来处理非阻塞通信、帧等。我们在使用 Tornado 移动大量数据时遇到了一些性能问题。其中一些问题已在 Tornado 上游得到直接改进,但我们仍然希望将来能够选择性地放弃 Tornado 的字节处理通信堆栈。随着 Dask 在拥有更快、更特殊的互连(如超级计算机)的机构中得到应用,这一点尤为重要。我们已被多次要求支持其他传输机制,例如 MPI。

第一步(也可能是最难的一步)是使 Dask 的通信系统可插拔,以便我们可以在不进行大量源代码更改的情况下使用不同的通信选项。我们一个月前完成了这项工作,现在可以相对容易地向 Dask 添加其他传输方式。目前 TCP 仍然是唯一真正的选择,尽管还有一个实验性的 ZeroMQ 选项(相对于 TCP 几乎没有性能提升)以及一个正在开发的完全内存选项。

对于用户来说,您会看到的主要区别是现在很多地方都加上了 tcp:// 前缀。例如

$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:  tcp://192.168.1.115:8786
...

多项 DataFrame 变更

像往常一样,Pandas API 得到了社区贡献者的更全面覆盖。一些代表性的变更包括以下内容

  1. 支持非统一分类:在对列进行分类时,我们不再需要对数据进行完整遍历。相反,我们独立地对每个分区进行分类(即使它们具有不同的类别值),然后在必要时才统一这些类别。

    df['x'] = df['x'].astype('category')  # this is now fast
    
  2. Groupby 累积归约

    df.groupby('x').cumsum()
    
  3. 支持向 Parquet 集合追加

    df.to_parquet('/path/to/foo.parquet', append=True)
    
  4. dask.dataframes 的新字符串和 HTML 表示。通常,Pandas 通过渲染数据的前几行来在屏幕上打印 DataFrame。然而,由于 Dask.dataframes 是惰性的,我们没有这些数据,因此通常会渲染关于 DataFrame 的一些元数据。

    >>> df  # version 0.13.0
    dd.DataFrame<make-ti..., npartitions=366, divisions=(Timestamp('2000-01-01
    00:00:00', freq='D'), Timestamp('2000-01-02 00:00:00', freq='D'),
    Timestamp('2000-01-03 00:00:00', freq='D'), ..., Timestamp('2000-12-31
    00:00:00', freq='D'), Timestamp('2001-01-01 00:00:00', freq='D'))>
    

    这种渲染虽然提供了信息,但可以改进。现在我们将 DataFrame 渲染为 Pandas DataFrame,但将元数据而不是实际数据放置在 DataFrame 中。

    >>> df  # version 0.14.0
    Dask DataFrame Structure:
                           x        y      z
    npartitions=366
    2000-01-01       float64  float64  int64
    2000-01-02           ...      ...    ...
    ...                  ...      ...    ...
    2000-12-31           ...      ...    ...
    2001-01-01           ...      ...    ...
    Dask Name: make-timeseries, 366 tasks
    

    此外,在 Jupyter notebook 中,它也能很好地渲染为 HTML 表格

多项分布式系统变更

分布式系统也发生了各种各样的变化。我在这里列出一些代表性的示例,以便了解正在发生的情况

  1. 处理多个客户端时确保先到先服务优先级
  2. 通过 Channels 发送少量数据。Channels 是连接到同一调度器的多个客户端/用户之间发布和交换数据的一种方式。以前它们只传输 Future(后者反过来可以指向集群上存在的更大规模数据)。但是我们发现,传递少量元数据也很有用,例如在协作处理相同工作负载的客户端之间发送进度信号或停止标准。现在您可以在 Channels 上发布任何可由 msgpack 序列化的数据。

    # Publishing Client
    scores = client.channel('scores')
    scores.append(123.456)
    
    # Subscribing Client
    scores = client.channel('scores')
    while scores.data[-1] < THRESHOLD:
        ... continue working ...
    
  3. 我们现在更擅长估算 SciPy 稀疏矩阵和 Keras 模型的数据大小。这使得 Dask 在何时以及不何时移动数据以进行负载均衡时做出更智能的选择。此外,Dask 现在还可以序列化 Keras 模型。
  4. 为了帮助在拥有共享网络文件系统的集群上部署的人(这在科研或学术机构中很常见),调度器和工作进程现在可以使用 --scheduler-file 关键字来通信连接信息。

    dask-scheduler --scheduler-file /path/to/scheduler.json
    dask-worker --scheduler-file /path/to/scheduler.json
    dask-worker --scheduler-file /path/to/scheduler.json
    
    >>> client = Client(scheduler_file='/path/to/scheudler.json')
    

    以前我们需要传递调度器的地址,这在不知道调度器将在哪个节点上运行时可能很困难。

其他

本文未提及许多更小的细节。欲了解更多信息,请访问变更日志和文档

此外,上个月大量的 Dask 工作发生在这些核心 Dask 仓库之外

您可以使用 Conda 或 Pip 安装或升级

conda install -c conda-forge dask distributed

pip install dask[complete] distributed --upgrade

致谢

自 1 月 27 日发布 0.13.0 版本以来,以下开发者为 dask/dask 仓库做出了贡献

  • Antoine Pitrou
  • Chris Barber
  • Daniel Davis
  • Elmar Ritsch
  • Erik Welch
  • jakirkham
  • Jim Crist
  • John Crickett
  • jspreston
  • Juan Luis Cano Rodríguez
  • kayibal
  • Kevin Ernst
  • Markus Gonser
  • Matthew Rocklin
  • Martin Durant
  • Nir
  • Sinhrks
  • Talmaj Marinc
  • Vlad Frolov
  • Will Warner

以下开发者为 dask/distributed 仓库做出了贡献

  • Antoine Pitrou
  • Ben Schreck
  • bmaisonn
  • Brett Naul
  • Demian Wassermann
  • Israel Saeta Pérez
  • John Crickett
  • Joseph Crail
  • Malte Gerken
  • Martin Durant
  • Matthew Rocklin
  • Min RK
  • strets123

博客评论由 Disqus 提供支持