分布式原型
本工作由 Continuum Analytics 和 XDATA Program 提供支持,作为 Blaze Project 的一部分
摘要:我们展示了一个分布式计算库的原型,并讨论了数据局部性。
分布式计算
这是一个新的分布式计算库原型。它需要一些关键性的反馈。
这篇博文在一个玩具示例中使用了 distributed
。我不会在这里讨论设计,但文档应该快速且信息丰富。我特别推荐 quickstart。
我们将在一个四节点集群上以几种不同的方式进行一个简单的计算。计算过程如下:
- 创建 1000 个随机的 numpy 数组,每个大小为 1 000 000
- 计算每个数组的和
- 计算所有和的总和
我们将直接使用分布式 Pool 进行此操作,然后再次使用 dask graph 进行。
启动集群
我在 EC2 上有一个由四台 m3.xlarge
组成的集群
ssh node1
dcenter
ssh node2
dworkder node1:8787
ssh node3
dworkder node1:8787
ssh node4
dworkder node1:8787
Pool
在客户端,我们启动一个分布式的 Pool
并将其指向中心节点。
>>> from distributed import Pool
>>> pool = Pool('node1:8787')
然后我们创建一堆随机的 numpy 数组
>>> import numpy as np
>>> arrays = pool.map(np.random.random, [1000000] * 1000)
我们的结果是一个代理对象列表,指向工作计算机上的单个 numpy 数组。我们直到需要时才移动数据。(尽管我们可以对此调用 .get()
以从 worker 获取 numpy 数组。)
>>> arrays[0]
RemoteData<center=10.141.199.202:8787, key=3e446310-6...>
对这些数据的进一步计算发生在集群上,即已经持有数据的 worker 节点上。
>>> sums = pool.map(np.sum, arrays)
这避免了昂贵的数据传输时间。不过,数据传输在必要时会发生,例如计算最终总和时。这强制了通信,因为所有中间和必须移动到一个节点进行最终的加法。
>>> total = pool.apply(np.sum, args=(sums,))
>>> total.get() # finally transfer result to local machine
499853416.82058007
distributed.dask
现在我们通过手动构建一个 dask graph 来一次性完成相同的计算(注意,这可能会变得复杂,下面有更友好的方法)。
>>> dsk = dict()
>>> for i in range(1000):
... dsk[('x', i)] = (np.random.random, 1000000)
... dsk[('sum', i)] = (np.sum, ('x', i))
>>> dsk['total'] = (sum, [('sum', i) for i in range(1000)])
>>> from distributed.dask import get
>>> get('node1', 8787, dsk, 'total')
500004095.00759566
显然,不是每个人都觉得手动编写 dask 字典很愉快。你也可以结合 dask.imperative 或 dask.array 使用。
dask.imperative
def get2(dsk, keys):
""" Make `get` scheduler that hardcodes the IP and Port """
return get('node1', 8787, dsk, keys)
>>> from dask.imperative import do
>>> arrays = [do(np.random.random)(1000000) for i in range(1000)]
>>> sums = [do(np.sum)(x) for x in arrays]
>>> total = do(np.sum)(sums)
>>> total.compute(get=get2)
499993637.00844824
dask.array
>>> import dask.array as da
>>> x = da.random.random(1000000*1000, chunks=(1000000,))
>>> x.sum().compute(get=get2)
500000250.44921482
dask 方法足够智能,可以删除所有不需要的中间数据。它可以在比我们集群所能容纳的数据量大得多的数据上智能运行。而使用 pool,我们需要手动管理数据。
>>> from distributed import delete
>>> delete(('node0', 8787), arrays)
混合使用
我们还可以混合使用这些抽象,并将来自 pool 的结果放入 dask graphs 中。
>>> arrays = pool.map(np.random.random, [1000000] * 1000)
>>> dsk = {('sum', i): (np.sum, x) for i, x in enumerate(arrays)}
>>> dsk['total'] = (sum, [('sum', i) for i in range(1000)])
讨论
Pool
和 get
用户界面相互独立,但都使用相同的底层网络,并且都基于相同的代码库构建。使用 distributed
,我想构建一个可以让我轻松进行实验的系统。到目前为止,我对结果感到非常满意。
这里一个不容忽视的主题是数据局部性。我们将中间结果保存在集群上,并尽可能在已经拥有相关数据的计算机上调度作业。如果必要,worker 之间可以相互通信,以便任何 worker 都可以执行任何作业,但我们尝试安排作业,以便在非必要时 worker 不需要通信。
另一个不容忽视的方面是,高级的 dask.array
示例无需对 dask 进行任何调整即可工作。Dask 将调度器与集合分离,这意味着现有的 dask.array 代码(或 dask.dataframe、dask.bag、dask.imperative 代码)会随着我们对新的更高级的调度器进行实验而发展。
最后,我希望这里的集群设置感觉非常简单。你确实需要一种方法在一堆机器上运行命令,但大多数拥有集群的人都有某种机制可以做到这一点,即使只是像我上面那样使用 ssh。我希望 distributed
能够降低在 Python 中进行非平凡集群计算的门槛。
免责声明
这里的一切都非常实验性。该库本身是损坏且不稳定的。它是在过去几周内制作的,尚未用于任何严肃的用途。请相应地调整预期,并提供关键性反馈。
博客评论由 Disqus 提供支持