这项工作由 Continuum AnalyticsXDATA 项目 提供支持,作为 Blaze 项目 的一部分

tl;dr Blaze 为我们关于外存 ND 数组的上一篇文章增加了可用性

免责声明:本文讨论的是实验性的、有 bug 的代码。尚未准备好供公众使用。

设置

这延续了我的 上一篇文章,该文章设计了一个简单的任务调度器,用于外存(或分布式)ND 数组。我们将带有数据依赖的任务编码为简单的字典。然后我们构建了函数来创建描述分块数组操作的字典。我们发现这是一种有效但不够友好的方式来解决一些重要但繁琐的问题。

本文借助 blazeinto 提升了编程体验,从而提供类似 NumPy 的外存体验。

旧的底层代码

这是我们为外存转置/点积编写的代码(实际上是对称的 rank-k 更新)。

在磁盘上创建随机数组

import bcolz
import numpy as np
b = bcolz.carray(np.empty(shape=(0, 1000), dtype='f8'),
                 rootdir='A.bcolz', chunklen=1000)
for i in range(1000):
    b.append(np.random.rand(1000, 1000))
b.flush()

定义计算 A.T * A

d = {'A': b}
d.update(getem('A', blocksize=(1000, 1000), shape=b.shape))

# Add A.T into the mix
d.update(top(np.transpose, 'At', 'ij', 'A', 'ji', numblocks={'A': (1000, 1)}))

# Dot product A.T * A
d.update(top(dotmany, 'AtA', 'ik', 'At', 'ij', 'A', 'jk',
         numblocks={'A': (1000, 1), 'At': (1, 1000)}))

使用 Blaze 编写的更易用的新代码

面向用户

上一节“定义计算”采用了一种风格,这种风格非常适合库开发者和自动化系统,但对于习惯了 Matlab/NumPy 或 R/Pandas 风格的用户来说则具有挑战性。

我们使用 Blaze 来封装这个过程,Blaze 是一个可扩展的分析计算前端

使用 Blaze 重新定义计算 A.T * A

from dask.obj import Array  # a proxy object holding on to a dask dict
from blaze import *

# Load data into dask dictionaries
dA = into(Array, 'A.bcolz', blockshape=(1000, 1000))
A = Data(dA)  # Wrap with blaze.Data

# Describe computation in friendly numpy style
expr = A.T.dot(A)

# Compute results
>>> %time compute(expr)
CPU times: user 2min 57s, sys: 6.4 s, total: 3min 3s
Wall time: 2min 50s
array([[ 334071.93541158,  250297.16968262,  250404.87729587, ...,
         250436.85274716,  250330.64262904,  250590.98832611],
       [ 250297.16968262,  333451.72293343,  249978.2751824 , ...,
         250103.20601281,  250014.96660956,  250251.0146828 ],
       [ 250404.87729587,  249978.2751824 ,  333279.76376277, ...,
         249961.44796719,  250061.8068036 ,  250125.80971858],
       ...,
       [ 250436.85274716,  250103.20601281,  249961.44796719, ...,
         333444.797894  ,  250021.78528189,  250147.12015207],
       [ 250330.64262904,  250014.96660956,  250061.8068036 , ...,
         250021.78528189,  333240.10323875,  250307.86236815],
       [ 250590.98832611,  250251.0146828 ,  250125.80971858, ...,
         250147.12015207,  250307.86236815,  333467.87105673]])

内部实现

在内部实现上,Blaze 创建了与我们上次手动创建的相同的 dask 字典。我修改了此处渲染的结果,以包含提示性名称。

>>> compute(expr, post_compute=False).dask
{('A': carray((10000000, 1000), float64), ...
 ...
 ('A', 0, 0): (ndget, 'A', (1000, 1000), 0, 0),
 ('A', 1, 0): (ndget, 'A', (1000, 1000), 1, 0),
 ...
 ('At', 0, 0): (np.transpose, ('A', 0, 0)),
 ('At', 0, 1): (np.transpose, ('A', 1, 0)),
 ...
 ('AtA', 0, 0): (dotmany, [('At', 0, 0), ('At', 0, 1), ('At', 0, 2), ...],
                          [('A', 0, 0),  ('A', 1, 0),  ('A', 2, 0), ...])
}

然后我们在单个核心上按顺序计算。然而,我们可以将其传递给分布式系统。此结果包含所有必要信息,可以按照您选择的任何方式从磁盘上的数组得到计算结果。

分离后端与前端

回想一下,Blaze 是数据分析技术的可扩展前端。它使我们能够使用令人愉悦且熟悉的用户中心 API 来封装混乱的计算 API。将 Blaze 扩展到 dask 字典只花了半个下午,是一件直接明了的工作。这种分离使我们能够继续构建面向 dask 的解决方案,而无需担心用户界面问题。通过将后端工作与前端工作分离,我们可以使双方都更清晰,并更快地发展。

未来工作

我目前正在度假。最近帖子的工作是在晚上和家人一起看电视时完成的。它不是特别健壮。尽管如此,用相对较少的精力就能取得如此显著的效果,这令人兴奋。

也许现在是提及 Continuum 拥有充足的拨款资金的好时机。我们正在寻找希望创建可用的大规模数据分析工具的人才。顺便说一句,我辞去了学术博士后职位来从事这项工作,并且对这个转变非常满意。

源代码

此代码是实验性的,并且有 bug。我不指望它会永远保持目前的形式(它会改进)。尽管如此,如果您在它发布时正在阅读本文,那么您可能想查看以下内容:

  1. dask 的 master 分支
  2. 我的 blaze fork 上的 array-expr 分支

博客评论由 Disqus 提供支持