执行摘要

这篇博文解释了分布式计算和不规则或形状不整齐输出相关的一些困难。我们将介绍一种在这种情况下使用 Dask 的推荐方法。

背景

通常,我们会遇到这样的工作流程:分析数据涉及搜索特征(这些特征可能存在也可能不存在),然后根据这些特征计算一些结果。由于我们无法提前知道会找到多少特征,因此处理输出的大小会随之变化。

对于分布式工作负载,我们需要将数据分割、处理,然后将结果重新组合。这意味着当 Dask 组合输出时,不规则输出可能会导致问题(如广播错误)。

问题约束

在这篇博文中,我们将看一个具有以下约束条件的示例:

  • 输入数组数据
  • 需要块之间存在重叠的处理函数
  • 返回的输出

解决方案

最简单的策略是两步流程:

  1. 使用 overlap 函数扩展数组块。
  2. 使用带有 drop_axis 关键字参数map_blocks

示例代码

import dask.array as da

arr = da.random.random((100, 100), chunks=(50,50))  # example input data
expanded = da.overlap.overlap(arr, depth=2, boundary="reflect")
result = expanded.map_blocks(processing_func, drop_axis=1, dtype=float)
result.compute()

支持多种输出类型

这种模式支持处理函数的多种输出类型,包括:

  • numpy 数组
  • pandas Series
  • pandas DataFrames

您可以使用下面任何一个示例处理函数自行尝试,生成模拟数据输出。或者,您也可以尝试使用您自己的函数。

# Random length, 1D output returned
import numpy as np
import pandas as pd

# function returns numpy array
def processing_func(x):
    random_length = np.random.randint(1, 7)
    return np.arange(random_length)

# function returns pandas series
def processing_func(x):
    random_length = np.random.randint(1, 7)
    output_series = np.arange(random_length)
    return pd.Series(output_series)

# function returns pandas dataframe
def processing_func(x):
    random_length = np.random.randint(1, 7)
    x_data = np.arange(random_length)
    y_data = np.random.random((random_length))
    return pd.DataFrame({"x": x_data, "y": y_data})

为什么我不能使用 map_overlapreduction

对于某些 Dask 函数,不规则的输出大小在组合输出时可能导致广播错误。

但是,如果输出大小不规则不是您特定编程问题的限制,那么您可以随意继续使用 Dask 的 map_overlapreduction 函数。

替代方案

Dask delayed

作为替代方案,您可以使用 Dask delayed此处提供教程)。

优点

  • 您的处理函数可以具有任何类型的输出(不受 numpy 或 pandas 对象的限制)
  • 您在使用 Dask delayed 的方式上有更大的灵活性。

缺点

  • 您将不得不自行处理输出的组合。
  • 您必须更加注意性能
    • 例如,因为下面的代码在列表推导式中使用了 delayed,出于性能原因,传入预期的元数据非常重要。幸运的是,dask 提供了一个 make_meta 函数。
    • 您可以在此处阅读更多关于 Dask delayed 的性能注意事项和最佳实践。

示例代码

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
import dask

arr = da.ones((20, 10), chunks=(10, 10))

@dask.delayed
def processing_func(x):
    # returns dummy dataframe output
    random_length = np.random.randint(1,10)
    return pd.DataFrame({'x': np.arange(random_length),
                         'y': np.random.random(random_length)})

meta = dd.utils.make_meta([('x', np.int64), ('y', np.int64)])
expanded = da.overlap.overlap(arr, depth=2, boundary="reflect")
blocks = expanded.to_delayed().ravel()
results = [dd.from_delayed(processing_func(b), meta=meta) for b in blocks]
ddf = dd.concat(results)
ddf.compute()

总结

就是这样!我们学习了如何在使用返回不规则输出的处理函数时避免常见错误。此处推荐的方法适用于多种输出类型,包括:numpy 数组、pandas series 和 pandas DataFrames。


博客评论由 Disqus 提供支持