这项工作得到了 Continuum AnalyticsXDATA 项目以及来自 Moore 基金会的数据驱动发现计划的支持。

我很高兴地宣布 Dask 0.14.1 版本发布。此版本包含多种性能和功能改进。这篇博文包含自 2 月 27 日上次发布以来的一些显著特性和变更。

一如既往,你可以从 conda-forge 使用 conda 进行安装

conda install -c conda-forge dask distributed

或者你可以从 PyPI 使用 pip 进行安装

pip install dask[complete] --upgrade

数组

分布式计算和机器学习领域的最新工作促使我们对处理数组的方式进行了新的、面向性能和可用性的改进。

NumPy 数组的自动分块和操作

Dask 数组和 NumPy 数组之间的许多交互都运行顺畅。NumPy 数组被设置为惰性计算,并根据操作和 Dask 数组进行适当分块。

>>> x = np.ones(10)                 # a numpy array
>>> y = da.arange(10, chunks=(5,))  # a dask array
>>> z = x + y                       # combined become a dask.array
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,)>

>>> z.compute()
array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.])

重塑 (Reshape)

在简单的情况下,重塑分布式数组很简单;在复杂的情况下,则可能相当复杂。Reshape 现在支持更广泛的形状转换,其中任何维度都可以折叠或合并到其他维度。

>>> x = da.ones((2, 3, 4, 5, 6), chunks=(2, 2, 2, 2, 2))
>>> x.reshape((6, 2, 2, 30, 1))
dask.array<reshape, shape=(6, 2, 2, 30, 1), dtype=float64, chunksize=(3, 1, 2, 6, 1)>

这个操作在许多分布式数组场景中都非常有用。

优化切片以最小化通信

Dask.array 切片优化现在会仔细生成图,以避免可能导致过多工作节点间通信的情况。具体实现细节对于一篇简短的博文来说有点超出范围,但其发展历程很有趣。

历史上,dask.arrays 几乎完全由研究人员使用,他们处理存储为 HDF5 或 NetCDF 文件的大型磁盘数组。这些用户主要使用单机多线程调度器。我们针对这种情况对 Dask 数组优化进行了大量调整,并使该社区相当满意。现在,随着部分社区转向在更大数据集上进行集群计算,优化目标有所转变。我们拥有大量的分布式磁盘带宽,但确实希望避免在工作节点之间通信大型结果。支持这两种用例是可能的,我认为我们在本次发布中已经做到了这一点,但这开始需要越来越精心的处理。

微优化

随着分布式计算的发展,图也变得越来越大,图创建的开销也日益重要。在此版本中,这一点得到了一定的优化。我们预计这在未来将是一个重点。

DataFrames

Set_index

Set_index 在两个方面更智能

  1. 如果你在一个恰好已排序的列上使用 set_index,我们将识别这一点并避免耗时的 shuffle(混洗)操作。这总是可以通过 sorted= 关键字实现,但用户很少使用此功能。现在这是自动的了。
  2. 类似地,在设置索引时,我们可以查看数据大小,判断分区是否过多或过少,并在 shuffle 过程中重新分块数据。如果分区过多(常见情况),这可以显著提升性能。

Shuffle 性能

我们对 dataframe shuffle 的部分操作进行了微优化。非常感谢 Pandas 开发者的帮助。这加快了 set_index、join、groupby-apply 等操作。

Fastparquet

快 parquet 库(fastparquet)最近被广泛使用,并经过了社区的多次错误修复。

重要的是,Fastparquet 现在支持 Python 2。

我们强烈推荐 Parquet 作为 Dask dataframes(和 Pandas DataFrames)的标准数据存储格式。

dask/fastparquet #87

分布式调度器

重放远程异常

调试很困难,部分原因是异常发生在远程机器上,而像 pdb 这样的普通调试工具无法触及。以前我们可以带回追溯(traceback)和异常信息,但无法深入堆栈跟踪(stack trace)来调查出错的原因

def div(x, y):
    return x / y

>>> future = client.submit(div, 1, 0)
>>> future
<Future: status: error, key: div-4a34907f5384bcf9161498a635311aeb>

>>> future.result()  # getting result re-raises exception locally
<ipython-input-3-398a43a7781e> in div()
      1 def div(x, y):
----> 2     return x / y

ZeroDivisionError: division by zero

现在 Dask 可以将失败的任务和所有必要的数据带回本地机器并重新运行,以便用户可以利用普通的 Python 调试工具链。

>>> client.recreate_error_locally(future)
<ipython-input-3-398a43a7781e> in div(x, y)
      1 def div(x, y):
----> 2     return x / y
ZeroDivisionError: division by zero

现在,如果你在 IPython 或 Jupyter notebook 中,可以使用 %debug magic 命令跳入堆栈跟踪,检查局部变量等。

In [8]: %debug
> <ipython-input-3-398a43a7781e>(2)div()
      1 def div(x, y):
----> 2     return x / y

ipdb> pp x
1
ipdb> pp y
0

dask/distributed #894

Async/await 语法

Dask.distributed 使用 Tornado 进行网络通信,使用 Tornado 协程(coroutines)实现并发。普通用户很少直接与 Tornado 协程交互;大多数人不熟悉它们,因此我们选择复制 concurrent.futures API。然而,如果你了解一点异步编程,有些复杂的情况会*容易得多*解决。

幸运的是,Python 生态系统似乎正在拥抱这一变化,通过 Python 3 中的 async/await 语法转向原生异步代码。为了鼓励人们学习异步编程并温和地引导他们转向 Python 3,Dask.distributed 现在在少数情况下支持 async/await。

你可以等待一个 dask Future

async def f():
    future = client.submit(func, *args, **kwargs)
    result = await future

你可以将 as_completed 迭代器放入 async for 循环

async for future in as_completed(futures):
    result = await future
    ... do stuff with result ...

而且,由于 Tornado 支持 await 协议,你也可以将现有的 shadow concurrency API(所有以下划线开头的名称)与 await 一起使用。(这以前也可以做到。)

results = client.gather(futures)         # synchronous
...
results = await client._gather(futures)  # asynchronous

如果你使用 Python 2,总是可以通过常规的 yieldtornado.gen.coroutine 装饰器来实现这一点。

dask/distributed #952

Inproc 传输

在上次发布中,我们使 Dask 除了 TCP 之外还能通过更多方式进行通信。实际上,这并没有太多应用(TCP 已经相当好用)。然而,在此版本中,我们现在支持单机“集群”,其中客户端、调度器和工作节点都在同一进程中,并通过内存队列进行零成本的数据传输。

这使得内存计算用户社区能够使用分布式调度器中才可用的一些更高级功能(异步计算、溢写到磁盘支持、网络诊断)。

如果你使用 LocalCluster 创建集群且不使用 Nanny 进程,则默认启用此功能。

>>> from dask.distributed import LocalCluster, Client

>>> cluster = LocalCluster(nanny=False)

>>> client = Client(cluster)

>>> client
<Client: scheduler='inproc://192.168.1.115/8437/1' processes=1 cores=4>

>>> from threading import Lock         # Not serializable
>>> lock = Lock()                      # Won't survive going over a socket
>>> [future] = client.scatter([lock])  # Yet we can send to a worker
>>> future.result()                    # ... and back
<unlocked _thread.lock object at 0x7fb7f12d08a0>

dask/distributed #919

工作节点间通信的连接池

工作节点现在相互维护一个持续连接池。此连接池大小固定,并采用最近最少使用策略移除连接。这避免了在工作节点之间传输数据时的重新连接延迟。实际上,这为每次通信节省了一两毫秒。

这实际上是去年我们关闭的一项旧功能的复活,当时我们认为这方面的性能不是问题。

连同其他增强功能,这使得我的笔记本电脑上的往返延迟降低到 11ms。

In [10]: %%time
    ...: for i in range(1000):
    ...:     future = client.submit(inc, i)
    ...:     result = future.result()
    ...:
CPU times: user 4.96 s, sys: 348 ms, total: 5.31 s
Wall time: 11.1 s

不过,这里可能还有改进空间。为了比较,这里是使用 concurent.futures.ProcessPoolExecutor 进行的相同测试结果。

In [14]: e = ProcessPoolExecutor(8)

In [15]: %%time
    ...: for i in range(1000):
    ...:     future = e.submit(inc, i)
    ...:     result = future.result()
    ...:
CPU times: user 320 ms, sys: 56 ms, total: 376 ms
Wall time: 442 ms

另外,需要明确的是,这衡量的是总往返延迟,而不是开销。Dask 的分布式调度器开销仍然在几百微秒的低水平。

dask/distributed #935

Dask 和机器学习领域一直很活跃

  • dask-learn 正在进行一些性能增强。事实证明,当你提供分布式网格搜索时,人们会很快希望将计算扩展到数十万次试验。
  • dask-glm 现在有了一些不错的凸优化算法。如果你有兴趣,该项目的作者最近写了一篇博文:在 Dask 中开发凸优化算法
  • dask-xgboost 允许你传递 Dask dataframes 或 arrays 中的分布式数据,并将其直接交给分布式 XGBoost 系统(Dask 会为你很好地设置和拆除)。这是两个运行在同一进程中的分布式服务之间轻松移交的一个很好的例子。

致谢

自 2 月 27 日 0.14.0 版本发布以来,以下人员对 dask/dask 仓库做出了贡献:

  • Antoine Pitrou
  • Brian Martin
  • Elliott Sales de Andrade
  • Erik Welch
  • Francisco de la Peña
  • jakirkham
  • Jim Crist
  • Jitesh Kumar Jha
  • Julien Lhermitte
  • Martin Durant
  • Matthew Rocklin
  • Markus Gonser
  • Talmaj

自 2 月 27 日 1.16.0 版本发布以来,以下人员对 dask/distributed 仓库做出了贡献:

  • Antoine Pitrou
  • Ben Schreck
  • Elliott Sales de Andrade
  • Martin Durant
  • Matthew Rocklin
  • Phil Elson

博客评论由 Disqus 提供