这项工作得到了 Continuum AnalyticsXDATA 项目 的支持,作为 Blaze 项目 的一部分

本文中的所有代码都是实验性的。不应依赖于此。希望在集群上部署 dask.distributed 的用户请参考 文档

目前,Dask 部署在以下实际系统上

  • SGE
  • SLURM,
  • Torque
  • Condor
  • LSF
  • Mesos
  • Marathon
  • Kubernetes
  • SSH 和自定义脚本
  • … 可能还有更多。这是我亲身了解到的。

这些系统为用户提供对集群资源的访问,并确保许多分布式服务/用户能够良好协作。它们对于任何现代集群部署都是必不可少的。

在这些集群资源管理器上部署 Dask 的人是高级用户;他们知道资源管理器如何工作,并且会阅读关于 如何设置 Dask 集群的文档。一般来说,这些用户对结果很满意;然而,我们应该降低这个门槛,以便可以使用集群资源管理器的非高级用户也能轻松地在他们的集群上使用 Dask。

不幸的是,存在一些挑战

  1. 存在多种集群资源管理器,每种都有相当多的用户采用。有限的开发者时间使我们无法支持所有这些管理器。
  2. 横向扩展的策略差异很大。例如,我们可能需要固定数量的工作节点,或者可能需要根据当前使用情况进行扩展的工作节点。不同的群体会有不同的解决方案。
  3. 单个集群部署具有高度可配置性。Dask 需要快速退出,让现有技术自行配置。

本文讨论了其中一些问题。它不包含明确的解决方案。

示例:Kubernetes

例如,Olivier Griesl (INRIA, scikit-learn) 和 Tim O’Donnell (西奈山医院, Hammer 实验室) 都发布了关于如何在 Kubernetes 上部署 Dask.distributed 的说明。

这些说明组织良好。它们包括 Dockerfile、发布的镜像、Kubernetes 配置文件以及如何与云提供商基础设施交互的说明。Olivier 和 Tim 显然都知道他们在做什么,并且乐于帮助其他人也做到这一点。

Tim(后来跟进者)不知道 Olivier 的解决方案,并自己写了一份。Tim 能够做到这一点,但许多初学者则不然。

一个解决方案 是将这些解决方案的重要注册列表包含在 Dask 文档中,以便人们能够找到高质量的参考资料作为起点。我在这里启动了一个资源列表:dask/distributed #547 指向其他资源的评论将非常受欢迎。

然而,即使 Tim 找到了 Olivier 的解决方案,我怀疑他仍然需要修改它。Tim 在软件和可扩展性方面的需求与 Olivier 不同。这就引出了一个问题:“Dask 应该提供什么,又应该留给管理员什么?” 也许我们能做的最好的事情就是支持复制-粘贴-编辑的工作流程。

什么是 Dask 特有的,什么是资源管理器特有的,以及每次需要手动配置什么?

自适应部署

为了探讨可分离解决方案这一主题,我构建了一个小型的 Dask.distributed 自适应部署系统,基于 Marathon,一个基于 Mesos 的编排平台。

这个解决方案做了两件事

  1. 它根据当前的使用情况动态扩展 Dask 集群。如果调度器中有更多任务,它就会请求更多工作节点。
  2. 它使用 Marathon 部署这些工作节点。

为了鼓励复制,这两个不同的方面由两段不同的代码解决,并有清晰的 API 边界。

  1. 一个与后端无关的自适应组件,用于指示何时以及如何安全地扩展工作节点和缩减工作节点
  2. 一个特定于 Marathon 的组件,使用 Marathon HTTP API 部署或销毁 dask 工作节点

这结合了策略,即自适应扩展,与后端,即Marathon结合起来,这样两者都可以轻松替换。例如,我们可以用固定策略替换自适应策略,以始终保持 N 个工作节点在线,或者我们可以用 Kubernetes 或 Yarn 替换 Marathon。

我希望这个演示能够鼓励其他人开发第三方软件包。本文的其余部分将深入探讨这个特定的解决方案。

自适应性

distributed.deploy.Adaptive 类封装了 Scheduler,并决定何时以及应扩展多少节点,以及何时应缩减并指定释放哪些空闲工作节点。

当前的策略相当简单明了

  1. 如果存在未分配的任务或任何可窃取的任务且没有空闲工作节点,或者平均内存使用率超过 50%,则按固定因子(默认为二)增加工作节点数量。
  2. 如果存在空闲工作节点且平均内存使用率低于 50%,则回收数据量最少的空闲工作节点(在将数据移动到附近工作节点后),直到内存使用率接近 50%。

认为这个策略可以改进或者有其他想法吗?太好了。它易于实现并且完全与主代码分离,因此你应该可以轻松编辑它或创建自己的策略。当前的实现大约有 80 行 (源代码)。

然而,这个 Adaptive 类实际上并不知道如何执行扩展操作。相反,它依赖于接收一个独立的对象,该对象有两个方法:scale_upscale_down

class MyCluster(object):
    def scale_up(n):
        """
        Bring the total count of workers up to ``n``

        This function/coroutine should bring the total number of workers up to
        the number ``n``.
        """
        raise NotImplementedError()

    def scale_down(self, workers):
        """
        Remove ``workers`` from the cluster

        Given a list of worker addresses this function should remove those
        workers from the cluster.
        """
        raise NotImplementedError()

这个集群对象包含了后端特定的关于如何扩展和缩减的细节,但没有关于自适应逻辑的何时扩展和缩减的逻辑。单机 LocalCluster 对象作为一个参考实现。

因此,我们将这个自适应方案与一个部署方案结合起来。我们将使用一个微小的 Dask-Marathon 部署库,可从此处获取。

from dask_marathon import MarathonCluster
from distributed import Scheduler
from distributed.deploy import Adaptive

s = Scheduler()
mc = MarathonCluster(s, cpus=1, mem=4000,
                     docker_image='mrocklin/dask-distributed')
ac = Adaptive(s, mc)

这以可组合的方式将策略(Adaptive)与部署方案(Marathon)结合起来。Adaptive 集群监控调度器,并在必要时调用 MarathonCluster 上的 scale_up/down 方法。

Marathon 代码

因为我们将所有的“何时”逻辑隔离到了 Adaptive 代码中,所以特定于 Marathon 的代码非常简短且具体。下面我们提供一个稍微简化的版本。构造函数中有相当多的特定于 Marathon 的设置,然后是下面简单的 scale_up/down 方法

from marathon import MarathonClient, MarathonApp
from marathon.models.container import MarathonContainer


class MarathonCluster(object):
    def __init__(self, scheduler,
                 executable='dask-worker',
                 docker_image='mrocklin/dask-distributed',
                 marathon_address='http://localhost:8080',
                 name=None, cpus=1, mem=4000, **kwargs):
        self.scheduler = scheduler

        # Create Marathon App to run dask-worker
        args = [
            executable,
            scheduler.address,
            '--nthreads', str(cpus),
            '--name', '$MESOS_TASK_ID',  # use Mesos task ID as worker name
            '--worker-port', '$PORT_WORKER',
            '--nanny-port', '$PORT_NANNY',
            '--http-port', '$PORT_HTTP'
        ]

        ports = [{'port': 0,
                  'protocol': 'tcp',
                  'name': name}
                 for name in ['worker', 'nanny', 'http']]

        args.extend(['--memory-limit',
                     str(int(mem * 0.6 * 1e6))])

        kwargs['cmd'] = ' '.join(args)
        container = MarathonContainer({'image': docker_image})

        app = MarathonApp(instances=0,
                          container=container,
                          port_definitions=ports,
                          cpus=cpus, mem=mem, **kwargs)

        # Connect and register app
        self.client = MarathonClient(marathon_address)
        self.app = self.client.create_app(name or 'dask-%s' % uuid.uuid4(), app)

    def scale_up(self, instances):
        self.client.scale_app(self.app.id, instances=instances)

    def scale_down(self, workers):
        for w in workers:
            self.client.kill_task(self.app.id,
                                  self.scheduler.worker_info[w]['name'],
                                  scale=True)

这并非无关紧要,你需要了解 Marathon 才能理解这一点,但幸运的是,你不需要了解太多其他东西。我希望熟悉其他集群资源管理器的人能够编写类似的对象,并像我在这里为 Marathon 解决方案所做的那样,将它们作为第三方库发布:https://github.com/mrocklin/dask-marathon (感谢 Ben Zaitlen 为此设置了出色的测试框架并启动了一切。)

自适应策略

类似地,我们可以设计新的部署策略。你可以在 Adaptive 类的策略中了解更多,参考文档源代码(大约八十行)。我鼓励大家实现和使用其他策略,并将那些在实践中证明有用的策略贡献回来。

总结思考

我们提出了一个问题

  • 分布式系统如何在保持合理性的同时支持多种集群资源管理器和多种调度策略?

我们提出了两种解决方案

  1. 维护解决方案链接的注册列表,支持复制-粘贴-编辑的实践
  2. 开发一个 API 边界,鼓励第三方库的可分离开发。

目前尚不清楚这两种解决方案是否足够,或者当前任何一种解决方案的实现是否良好。然而,这是一个重要的问题,因为目前 Dask.distributed 主要仍由超级用户使用。我希望在我们寻找良好解决方案的过程中,能够激发社区的创造力。


博客评论由 Disqus 提供支持