这项工作得到了 Continuum AnalyticsXDATA 项目以及 Moore Foundation 数据驱动发现计划的支持。

摘要

我们使用 Dask 构建分布式优化算法。我们展示了简单的示例,以及用于广义线性模型的新兴 dask-glm 库的基准测试。我们还谈论了学习使用 Dask 进行此类工作的经验。

这篇博文由了解优化的 Chris White(Capital One 公司)和了解分布式计算的 Matthew Rocklin(Continuum Analytics 公司)合著。

引言

许多机器学习和统计模型(如逻辑回归)依赖于牛顿法、随机梯度下降等凸优化算法。这些优化算法既实用(在许多应用中使用)又具有数学趣味性。因此,这些算法多年来一直是全球学术界和工业界研究人员和研究生的研究课题。

大约五到十年前,当数据集规模增长到超出工作内存,“大数据”成为流行词时,事情变得有趣起来。这些算法的并行和分布式解决方案已成为常态,研究人员的技能现在必须超越线性代数和优化理论,包括并行算法,甚至可能包括网络编程,特别是如果你想探索和创建更有趣的算法。

然而,相对较少的人既懂数学优化理论,又懂分布式系统的细节。通常,算法研究人员依赖于 Spark 或 Flink 等分布式计算库的 API 来实现他们的算法。在这篇博文中,我们将探讨 Dask 在这些应用中的帮助程度。我们将从两个角度来探讨这一点

  1. 算法研究人员(Chris):了解优化和迭代算法(如共轭梯度法 Conjugate Gradient、对偶上升法 Dual Ascent 或 GMRES)但不熟悉分布式计算主题(如 sockets、MPI、负载均衡等)的人
  2. 分布式系统开发者(Matt):知道如何在机器间移动字节并保持机器忙碌,但不知道如何正确进行线搜索(line search)或处理病态矩阵(poorly conditioned matrix)的人

在 Dask 中原型化算法

有了算法知识和 NumPy 数组计算知识,使用 Dask 编写并行算法就很简单。对于一系列复杂的算法结构,我们有两种直接的选择:

  1. 使用并行多维数组从矩阵乘法、SVD 等常见操作构建算法。这很好地反映了数学算法,但缺乏一些灵活性。
  2. 手动创建算法,跟踪内存数据单个块上的操作以及它们之间的依赖关系。这非常灵活,但需要更细心一些。

从头开始实现这两种选择中的任何一种都可能是一项艰巨的任务,但使用 Dask 可以像编写 NumPy 代码一样简单。

让我们构建一个示例,演示如何使用内置的数组并行性以及 Dask 提供的更高级、更定制化的并行化功能来拟合大型线性回归模型。dask.array 模块通过使用相同的语法,帮助我们轻松地并行化标准的 NumPy 功能——我们将从这里开始。

数据创建

Dask 有多种创建 Dask 数组的方法;为了快速开始原型开发,让我们用 NumPy 用户应该熟悉的方式创建一些随机数据。

import dask
import dask.array as da
import numpy as np

from dask.distributed import Client

client = Client()

## create inputs with a bunch of independent normals
beta = np.random.random(100)  # random beta coefficients, no intercept
X = da.random.normal(0, 1, size=(1000000, 100), chunks=(100000, 100))
y = X.dot(beta) + da.random.normal(0, 1, size=1000000, chunks=(100000,))

## make sure all chunks are ~equally sized
X, y = dask.persist(X, y)
client.rebalance([X, y])

注意 X 是一个存储在 10 个块中的 Dask 数组,每个块的大小为 (100000, 100)。还要注意 X.dot(beta) 对于 numpy 数组和 dask 数组都能平稳运行,因此我们可以编写基本上在这两种环境下都能工作的代码。

注意:如果 X 是一个 numpy 数组而 beta 是一个 dask 数组,X.dot(beta) 将输出一个内存中(in-memory)numpy 数组。这通常不是我们想要的,因为你需要谨慎选择何时将数据加载到内存中。一种解决方法是使用 multipledispatch 来处理一些特殊的边缘情况;有关示例,请在此处查看 dot 代码。

Dask 还内置了方便的可视化功能,我们将利用这些功能;下面我们在其 10 个独立的块中可视化我们的数据

Create data for dask-glm computations

数组编程

如果你可以用 NumPy 编写基于数组的迭代算法,那么你就可以用 Dask 编写迭代并行算法。

正如我们已经看到的,Dask 继承了许多我们熟悉的 NumPy API,因此我们可以编写简单的 NumPy 风格的迭代优化算法,这些算法将利用 dask.array 内置的并行性。例如,如果我们想天真地(naively)对上述数据拟合线性回归模型,我们正在尝试解决以下凸优化问题

\[\min_{\beta} \|y - X\beta\|_2^2\]

回想一下,在非奇异情况下,这个问题有一个封闭形式的解,由下式给出:

\[\beta^* = \left(X^T X\right)^{-1} X^T y\]

我们可以使用 Dask 和上面的公式计算 $\beta^*$。

## naive solution
beta_star = da.linalg.solve(X.T.dot(X), X.T.dot(y))

>>> abs(beta_star.compute() - beta).max()
0.0024817567237768179

有时直接求解成本太高,我们想仅使用简单的矩阵向量乘法来解决上述问题。为此,让我们更进一步,实际实现一个利用并行矩阵运算的梯度下降算法。回想一下,梯度下降通过以下更新迭代地改进 beta 的初始估计:

\[\beta^+ = \beta - \alpha \nabla f(\beta)\]

其中 $\alpha$ 可以根据多种不同的“步长”规则选择;为了便于说明,我们将使用一个恒定步长。

## quick step-size calculation to guarantee convergence
_, s, _ = np.linalg.svd(2 * X.T.dot(X))
step_size = 1 / s - 1e-8

## define some parameters
max_steps = 100
tol = 1e-8
beta_hat = np.zeros(100) # initial guess

for k in range(max_steps):
    Xbeta = X.dot(beta_hat)
    func = ((y - Xbeta)**2).sum()
    gradient = 2 * X.T.dot(Xbeta - y)

    ## Update
    obeta = beta_hat
    beta_hat = beta_hat - step_size * gradient
    new_func = ((y - X.dot(beta_hat))**2).sum()
    beta_hat, func, new_func = dask.compute(beta_hat, func, new_func)  # <--- Dask code

    ## Check for convergence
    change = np.absolute(beta_hat - obeta).max()

    if change < tol:
        break

>>> abs(beta_hat - beta).max()
0.0024817567259038942

值得注意的是,几乎所有这些代码都与等效的 NumPy 代码完全相同。由于 Dask.array 和 NumPy 共享相同的 API,对于已经熟悉 NumPy 的人来说,立即开始使用分布式算法非常容易。我们唯一需要改变的是生成原始数据的方式(使用 da.random.normal 而不是 np.random.normal),以及在更新状态结束时调用 dask.computedask.compute 调用告诉 Dask 执行我们迄今为止告诉它要做的所有计算(Dask 默认是惰性的 lazy)。除此之外,所有数学运算、矩阵乘法、切片等都与 Numpy 完全相同,不同之处在于 Dask.array 为我们构建了按块(chunk-wise)的并行计算,而 Dask.distributed 可以在并行环境中执行该计算。

为了更好地理解上述算法在一个更新步骤中发生的所有调度(scheduling),这里有一个计算 beta_hat 和新的函数值 new_func 所需计算的可视化图。

Gradient descent step Dask graph

每个矩形是我们的分布式数组的一个内存块(in-memory chunk),每个圆圈是这些内存块上的 NumPy 函数调用。Dask 调度器(scheduler)决定了在我们的机器集群上(或者仅仅在笔记本电脑的核心上)何时何地运行所有这些计算。

数组编程 + dask.delayed

现在我们已经看到了如何使用 Dask.array 提供的内置并行算法,让我们更进一步,讨论如何编写更定制化的并行算法。机器学习中许多基于“共识”(consensus)的分布式算法都基于这样一个想法:每个数据块可以独立并行处理,并将其对最优参数值的猜测发送给某个主节点(master node)。主节点计算出最优参数的共识估计值,并将其报告回所有工作节点(workers)。然后每个工作节点根据这些新信息处理自己的数据块,这个过程持续进行直到收敛。

从并行计算的角度来看,这是一个非常简单的 MapReduce 过程。任何分布式计算框架都应该能轻松处理这一点。我们将用它作为一个非常简单的例子来说明如何使用 Dask 更定制化的并行选项。

一种这样的算法是交替方向乘子法(Alternating Direction Method of Multipliers),简称 ADMM。为了本文的目的,我们将把每个工作节点所做的工作视为一个黑盒(black box)。

我们还将考虑上述问题的正则化(regularized)版本,即

\[\min_{\beta} \|y - X\beta\|_2^2 + \lambda \|\beta\|_1\]

最终,我们要做的只是

  • 创建 NumPy 函数,定义每个块如何更新其参数估计值
  • 使用 dask.delayed 封装这些函数
  • 调用 dask.compute 并处理各个估计值,再次使用 NumPy

首先,我们需要定义一些本地(local)函数,块将使用这些函数来更新其各自的参数估计值,并从 dask_glm 中导入黑盒 local_update 步骤;此外,我们还需要所谓的收缩(shrinkage)算子(这是我们问题中 $l1$-范数的近端算子 proximal operator)。

from dask_glm.algorithms import local_update

def local_f(beta, X, y, z, u, rho):
    return ((y - X.dot(beta)) **2).sum() + (rho / 2) * np.dot(beta - z + u,
                                                                  beta - z + u)

def local_grad(beta, X, y, z, u, rho):
    return 2 * X.T.dot(X.dot(beta) - y) + rho * (beta - z + u)


def shrinkage(beta, t):
    return np.maximum(0, beta - t) - np.maximum(0, -beta - t)

## set some algorithm parameters
max_steps = 10
lamduh = 7.2
rho = 1.0

(n, p) = X.shape
nchunks = X.npartitions

XD = X.to_delayed().flatten().tolist()  # A list of pointers to remote numpy arrays
yD = y.to_delayed().flatten().tolist()  # ... one for each chunk

# the initial consensus estimate
z = np.zeros(p)

# an array of the individual "dual variables" and parameter estimates,
# one for each chunk of data
u = np.array([np.zeros(p) for i in range(nchunks)])
betas = np.array([np.zeros(p) for i in range(nchunks)])

for k in range(max_steps):

    # process each chunk in parallel, using the black-box 'local_update' magic
    new_betas = [dask.delayed(local_update)(xx, yy, bb, z, uu, rho,
                                            f=local_f,
                                            fprime=local_grad)
                 for xx, yy, bb, uu in zip(XD, yD, betas, u)]
    new_betas = np.array(dask.compute(*new_betas))

    # everything else is NumPy code occurring at "master"
    beta_hat = 0.9 * new_betas + 0.1 * z

    # create consensus estimate
    zold = z.copy()
    ztilde = np.mean(beta_hat + np.array(u), axis=0)
    z = shrinkage(ztilde, lamduh / (rho * nchunks))

    # update dual variables
    u += beta_hat - z

>>> # Number of coefficients zeroed out due to L1 regularization
>>> print((z == 0).sum())
12

当然,上述算法中还有一些额外的工作,但应该清楚的是,分布式操作不是其中困难的部分。使用 dask.delayed,我们能够用同样简单的 Python for 循环和 delayed 函数调用来表达像 ADMM 这样简单的 MapReduce 算法。dask.delayed 会跟踪我们想要进行的所有函数调用以及它们依赖的其他函数调用。例如,所有 local_update 调用都可以相互独立地发生,但共识计算(consensus computation)会阻塞直到所有这些调用完成。

我们希望上面展示的两种并行算法(梯度下降、ADMM)对于具有优化背景的读者来说是直观易懂的。这些实现可以在笔记本电脑、单台多核工作站或必要时在千节点集群上良好运行。我们一直在 dask-glm 中构建这些算法(及其他算法)更复杂的实现。从优化的角度来看,它们更复杂(包括停止准则 stopping criteria、步长 step size、异步性 asynchronicity 等),但从分布式计算的角度来看仍然简单。

实验

我们在笔记本电脑上将 dask-glm 实现与 Scikit-learn 进行比较,然后展示它们在集群上的运行情况。

可复现的 notebook 可在此处获取

我们正在 dask-glm 中构建上述算法更复杂的版本。该项目包含了用于梯度下降、近端梯度下降、牛顿法和 ADMM 的凸优化算法。这些实现通过考虑停止准则、步长以及为简化而避免的其他细节,扩展了上面的实现。

在本节中,我们将通过进行一个简单的数值实验来展示这些算法,比较近端梯度下降和 ADMM 与 Scikit-Learn 的 LogisticRegression 和 SGD 实现在单台机器(个人笔记本电脑)上的数值性能,然后通过将 dask-glm 选项扩展到中等规模的集群来进一步说明。

免责声明:这些实验是初步的(crude)。我们使用的是人工数据,我们没有调整参数,甚至没有找到能使这些算法产生相同精度结果的参数。本节的目标只是大致说明不同方法的比较感觉。

我们创建数据

## size of problem (no. observations)
N = 8e6
chunks = 1e6
seed = 20009
beta = (np.random.random(15) - 0.5) * 3

X = da.random.random((N,len(beta)), chunks=chunks)
y = make_y(X, beta=np.array(beta), chunks=chunks)

X, y = dask.persist(X, y)
client.rebalance([X, y])

并按如下方式运行我们的每种算法

# Dask-GLM Proximal Gradient
result = proximal_grad(X, y, lamduh=alpha)

# Dask-GLM ADMM
X2 = X.rechunk((1e5, None)).persist()  # ADMM prefers smaller chunks
y2 = y.rechunk(1e5).persist()
result = admm(X2, y2, lamduh=alpha)

# Scikit-Learn LogisticRegression
nX, ny = dask.compute(X, y)  # sklearn wants numpy arrays
result = LogisticRegression(penalty='l1', C=1).fit(nX, ny).coef_

# Scikit-Learn Stochastic Gradient Descent
result = SGDClassifier(loss='log',
                       penalty='l1',
                       l1_ratio=1,
                       n_iter=10,
                       fit_intercept=False).fit(nX, ny).coef_

然后我们使用 $L_{\infty}$ 范数(最大差值)进行比较。

abs(result - beta).max()

下表显示了这些参数的运行时间和与真实“生成 beta”的 $L_\infty$ 距离

算法 误差 时长 (秒)
近端梯度法 0.0227 128
ADMM 0.0125 34.7
LogisticRegression 0.0132 79
SGDClassifier 0.0456 29.4

再次强调,请不要过于认真看待这些数字:这些算法都解决正则化问题,因此我们不期望结果必然接近底层的生成 beta(即使是渐近地)。上面的数字旨在表明它们返回的结果与上述 beta 的距离大致相同。此外,Dask-glm 使用了完整的四核笔记本电脑,而 SKLearn 限制为使用单核。

在下面的部分中,我们包含了近端梯度法和 ADMM 的性能剖析图(profile plots)。这些图显示了八个线程中的每一个随时间进行的操作。你可以将鼠标悬停在矩形/任务上,并使用右上角的缩放工具进行放大。你可以看到算法复杂度的差异。从 Dask 的角度来看,ADMM 简单得多,但对于此块大小,它也能更好地饱和硬件(saturates hardware)。

近端梯度下降法的性能剖析图

ADMM 的性能剖析图

总的来说,这里的重点是 dask-glm 在单台机器上的性能与 Scikit-Learn 相当。如果你的问题可以在单台机器的内存中处理,你应该继续使用 Scikit-Learn 和 Statsmodels。dask-glm 算法的真正优势在于它们具有可伸缩性(scale),并且可以通过在单台计算机上从磁盘操作或在计算机集群上协作运行,高效处理大于内存的数据。

集群计算

作为演示,我们在 EC2 上的一个由八个 m4.2xlarge 节点组成的集群上运行上述数据的更大版本(每个节点有 8 个核心和 30GB 内存)。

我们创建一个更大的数据集,包含 8 亿行和 15 列,分布在八个进程中。

N = 8e8
chunks = 1e7
seed = 20009
beta = (np.random.random(15) - 0.5) * 3

X = da.random.random((N,len(beta)), chunks=chunks)
y = make_y(X, beta=np.array(beta), chunks=chunks)

X, y = dask.persist(X, y)

然后我们运行与之前相同的 proximal_gradadmm 操作

# Dask-GLM Proximal Gradient
result = proximal_grad(X, y, lamduh=alpha)

# Dask-GLM ADMM
X2 = X.rechunk((1e6, None)).persist()  # ADMM prefers smaller chunks
y2 = y.rechunk(1e6).persist()
result = admm(X2, y2, lamduh=alpha)

近端梯度法在大约十七分钟内完成,而 ADMM 在大约四分钟内完成。下面包含了这两种计算的性能剖析图。

近端梯度下降法的性能剖析图

这里我们只包含前几次迭代。否则,这个图将有几兆字节。

全屏图链接

ADMM 的性能剖析图

全屏图链接

两者都获得了与我们之前观察到的类似的 $L_{\infty}$ 误差。

算法 误差 时长 (秒)
近端梯度法 0.0306 1020
ADMM 0.00159 270

尽管这次我们需要注意几件事

  1. 在重新分块(rechunking)后,我们显式地删除了旧数据(ADMM 倾向于使用与近端梯度法不同的块大小),因为我们的完整数据集 100GB 已经非常接近我们的总分布式内存(240GB),所以最好避免不必要地保留副本。如果不这样做,计算也能正常运行,但将多余数据溢出到磁盘会 negatively affect performance(负面影响性能)。
  2. 我们将环境变量 OMP_NUM_THREADS=1 设置为 1,以避免 CPU 过度订阅(over-subscribing)。令人惊讶的是,不这样做会导致性能下降和非确定性结果。这是一个我们仍在追踪的问题。

分析

Dask-GLM 中的算法是新的,需要进一步开发,但对于在此技术水平上操作的人来说,它们处于可用状态。此外,我们希望吸引其他数学和算法开发者参与这项工作。我们发现 Dask 在灵活性(足以支持有趣的算法)和管理性(足以供没有强大分布式系统背景的研究人员使用)之间提供了很好的平衡。在本节中,我们将讨论我们从 Chris(数学算法)和 Matt(分布式系统)的角度学到的东西,然后谈论可能的未来工作。我们鼓励大家关注未来的工作;我们对合作持开放态度,并认为这是新研究人员有意义地参与的好机会。

Chris 的视角

  1. 使用 Dask 创建分布式算法出奇地容易;尽管对于何时调用 persistcomputerebalance 等操作仍有一个小小的学习曲线,但这无法避免。使用 Dask 进行算法开发是一个很好的学习环境,有助于理解与分布式算法相关的独特挑战(包括通信成本等)。
  2. 正确掌握算法的细节并不容易;在更好地理解许多算法中出现的容差设置与精度权衡(tolerance settings vs. accuracy tradeoffs)以及为提高精度而微调收敛准则(convergence criteria)方面,仍有工作要做。
  3. 在软件开发方面,可靠地测试优化算法很困难。找到应该满足的且在数值上稳定的可证明正确的 optimality conditions(最优性条件)一直对我来说是个挑战。
  4. 独自研究算法远不如合作有趣;请加入讨论并贡献力量!
  5. 从我的角度来看最重要的是,我发现“社区”中存在着令人惊讶的大量误解,关于优化算法在预测建模领域的作用、它们各自解决的问题以及在给定问题中它们是否可互换。例如,牛顿法不能用于优化 l1 正则化问题,并且 l1 正则化问题得到的系数估计值与 l2 正则化问题(以及非正则化问题)的估计值在根本上(和数值上)是不同的。我个人的目标是让 dask-glm 的 API 更透明地揭示这些细微的区别,并引导“实际应用中”做出更周全的模型决策。

Matt 的视角

这项工作在 Dask 库内部触发了许多具体的改变

  1. 我们可以将 Dask.dataframes 转换为 Dask.arrays。这特别重要,因为人们希望使用 dataframe 进行预处理,然后切换到高效的多维数组用于算法。
  2. 我们不得不稍微统一单机调度器和分布式调度器的 API,特别是为单机调度器添加了 persist 函数。这特别重要,因为 Chris 通常在他的笔记本电脑上进行原型开发,但我们希望编写能在集群上高效运行的代码。
  3. 调度器开销(Scheduler overhead)对于迭代的 dask-array 算法(梯度下降、近端梯度下降、BFGS)可能是一个问题。这个问题尤其突出,因为 NumPy 非常快。通常我们的任务只需要几毫秒,这使得 Dask 每任务 200 微秒的开销变得非常重要(这就是为什么你在上面的性能剖析图中看到空白的原因)。我们已经开始通过一些方法解决这个问题,例如更积极的任务融合(task fusion)和普遍降低开销,但这将是一个中长期挑战。在实际使用 dask-glm 中,我们已经开始通过选择合适的块大小来解决这个问题。我猜想,特别是对于 dask-glm,我们将开发自动块大小启发式算法(auto-chunksize heuristics),这将大部分解决这个问题。然而,我们预计在使用 HPC 系统与科学家进行其他工作时,这个问题会再次出现,他们也面临类似的情况。
  4. 对于算法用户来说,有几件事可能比较棘手
    1. 放置异步启动计算(persist, compute)的调用。实际上,Chris 在这方面做得很好,然后我来进行了微调。Web 诊断工具最终对于识别问题至关重要。
    2. 避免意外地在 dask.arrays 上调用 NumPy 函数,反之亦然。我们在 dask.array 端改进了这一点,现在当给定 numpy 数组时,它们会智能地操作。在 NumPy 端改变这一点比较困难,直到 NumPy 协议发生变化(这正在计划中)。

未来工作

我们想做很多事情,无论是在衡量方面还是针对 dask-glm 项目本身。我们欢迎大家就以下问题发表意见(并加入开发):

  1. 异步算法
  2. 用户 API
  3. 扩展 GLM 系列
  4. 编写更广泛严格的算法测试 - 为了满足可证明的最优性条件(provable optimality criteria),以及为了对各种输入数据的鲁棒性(robustness)
  5. 开始研究智能初始化例程

亲爱的读者,您对此有何看法?Matt 和 Chris 在这个项目上都需要帮助。我们希望上面的一些问题能为社区参与提供契机。我们欢迎其他问题、意见和贡献,无论是通过 GitHub Issue 还是在下方评论。

致谢

还要感谢 Hussain Sultan(Capital One 公司)和 Tom Augspurger 在 Dask-GLM 上的合作,以及 Will Warner(Continuum 公司)对本文的审阅和编辑。


博客评论由 Disqus 提供支持