这项工作由 Continuum AnalyticsXDATA 项目 支持,是 Blaze 项目的一部分。

tl;dr: 我们探讨了将并行编程扩展到大型集合之外的必要性。我们讨论了定制 dask 图的可用性。

近期并行工作的重点是大型集合

并行数据库、Spark 和 Dask 集合都提供了大型分布式集合,可以为您处理并行计算。您将数据放入集合中,使用少量操作(如 mapgroupby)进行编程,然后集合处理并行计算。这个想法已经变得如此流行,以至于现在有十几个项目承诺提供大型友好的 Pandas 克隆。

这很好。这些集合为大量的常见问题提供了可用、高层的接口。

定制工作负载

然而,许多工作负载对于这些集合来说过于复杂。工作负载可能很复杂,原因在于它们来自复杂的算法(正如我们在最近关于 SVD 的文章中看到的那样),或者因为它们来自现实世界,那里的问题往往很混乱。

在这些情况下,我倾向于看到人们做两件事

  1. 回退到 multiprocessingMPI 或其他某种显式的并行形式
  2. 通过巧妙地选择键,进行“脑力体操”将他们的问题塞进 Spark 中。这些情况下通常很难获得显著的加速。

直接使用 Dask 图

历史上,我推荐在这些情况下手动构建 dask 图。手动构建 dask 图可以让您指定相当任意的工作负载,然后使用 dask 调度器并行执行。 dask 文档中有一个简单的流水线数据处理示例:

def load(filename):
    ...
def clean(data):
    ...
def analyze(sequence_of_data):
    ...
def store(result):
    ...

dsk = {'load-1': (load, 'myfile.a.data'),
       'load-2': (load, 'myfile.b.data'),
       'load-3': (load, 'myfile.c.data'),
       'clean-1': (clean, 'load-1'),
       'clean-2': (clean, 'load-2'),
       'clean-3': (clean, 'load-3'),
       'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
       'store': (store, 'analyze')}

from dask.multiprocessing import get
get(dsk, 'store')  # executes in parallel

用户反馈称,这很有趣且强大,但直接在字典中编程不够直观,与 IDE 集成不佳,并且容易出错。

介绍 dask.do

要使用近似正常的 Python 代码创建相同的定制并行工作负载,我们使用 dask.do 函数。这个 do 函数将任何普通的 Python 函数变成一个延迟执行的版本,并将其添加到 dask 图中。 do 函数允许我们将上面的计算重写如下:

from dask import do

loads = [do(load)('myfile.a.data'),
         do(load)('myfile.b.data'),
         do(load)('myfile.c.data')]

cleaned = [do(clean)(x) for x in loads]

analysis = do(analyze)(cleaned)
result = do(store)(analysis)

这里的显式函数调用不会直接执行工作;相反,它们构建了一个 dask 图,我们可以使用选择的调度器并行执行该图。

from dask.multiprocessing import get
result.compute(get=get)

该接口由 Gael Varoquaux 根据他使用 joblib 的经验提出。它由 Jim CristPR (#408) 中实现。

示例:嵌套交叉验证

我和一位机器学习学生 Gabriel Krummenacher 坐下来,一起将一段进行嵌套交叉验证的小代码并行化。下面是一个使用 dask.do 进行并行化的顺序实现的对比:

您可以安全地跳过深入阅读这段代码。要点是它有些复杂,但添加并行性的工作量很小。

parallized cross validation code

并行版本在我的笔记本上运行速度快了大约四倍。免责声明:这里展示的顺序版本只是并行代码的一个降级版本,这就是为什么它们看起来如此相似。您可以在 github 上找到这段代码。

因此,我们普通的命令式 for 循环代码的结果是一个完全可并行化的 dask 图。我们将在下面可视化该图。

test_score.visualize()

Cross validation dask graph

帮助

这是一个有用的接口吗?如果大家能尝试一下并就 dask.do 提供反馈,那就太好了。

有关 dask.do 的更多信息,请参阅 dask 命令式编程文档


博客评论由 Disqus 提供