这项工作由 Continuum Analytics 提供支持

引言

机构使用软件的方式与个人不同。在过去的几个月里,我就在大学、研究实验室、私营企业和非营利学习系统等大型组织内使用 Dask 的问题进行了几十次交流。这篇文章非常粗略地总结了这些交流,并提取出常见问题。接下来我将尝试回答这些问题。

注意:这篇文章有些地方必然会比较模糊。有些公司倾向于保护隐私。这里的详细信息要么来自 Dask 的公开议题,要么是与足够多的机构(例如至少五个)交流后提出的,我才认为可以在这里列出这些问题。

常见情况

机构 X,一家大学/研究实验室/公司/……,拥有许多科学家/分析师/建模师,他们使用 Python、PyData 技术栈(如 NumPy/Pandas/SKLearn)和大量定制代码来开发模型和分析数据。这些模型/数据有时会变得非常庞大,需要适量的并行计算能力。

幸运的是,机构 X 拥有一个内部集群,正是为了加速大型计算和数据集的建模与分析而购置的。用户可以使用作业调度系统,如 SGE/LSF/Mesos/其他,向集群提交作业。

然而,该集群仍然未得到充分利用,用户仍然寻求并行计算方面的帮助。这可能是因为用户不习惯使用 SGE/LSF/Mesos/其他 的界面,或者它不支持足够复杂/动态的工作负载,或者交互时间不足以满足用户所重视的交互式使用需求。

之前曾有一项内部努力,希望在 SGE/LSF/Mesos/其他之上构建一个更复杂/交互/Pythonic 的系统,但它不够成熟,而且机构 X 肯定不想继续投入。事实证明,在内部设计/构建/维护这样的系统比预期的要困难。他们非常希望找到一个功能完善且由社区维护的开源解决方案。

Dask.distributed 调度器看起来已经满足了机构 X 90% 的需求。然而,仍有一些悬而未决的问题:

  • 我们如何将 dask.distributed 与 SGE/LSF/Mesos/其他 作业调度系统集成?
  • 我们如何根据使用情况动态地扩展和收缩集群?
  • 用户如何管理 worker 上的软件环境?
  • 分布式调度器有多安全?
  • Dask 对 worker 故障具有弹性,那么调度器故障呢?
  • 如果 dask-worker 分布在两个不同的数据中心会发生什么?我们可以非对称地进行扩展吗?
  • 我们如何处理多个并发用户和优先级?
  • 这与 Spark 相比如何?

因此,在这篇文章的剩余部分,我将回答这些问题。像往常一样,很少有答案会是“是的,Dask 可以解决你所有的问题”。这些都是悬而未决的问题,而不是那些容易回答的问题。我们将深入探讨今天可能实现什么,以及我们将来如何解决这些问题。

我们如何将 dask.distributed 与 SGE/LSF/Mesos/其他 集成?

使用 SGE/LSF/Mesos/其他 等工具,在现有集群内大规模部署 dask.distributed 并不困难。在许多情况下,机构内已经有研究人员通过在集群中的某个静态节点上手动运行 dask-scheduler,然后使用他们的作业调度系统和一个小型作业脚本启动几百个 dask-worker 来完成这项工作。

现在的目标是,如何针对机构内部使用的 SGE/LSF/Mesos/其他 的特定版本,将这一过程进行标准化,同时开发和维护一个标准的 Pythonic 接口,以便 Dask 开发者在可预见的未来能够以较低成本维护所有这些工具。在某些情况下,机构 X 很乐意为开发一个便捷的“在我的作业调度器上启动 dask”工具付费,但他们不太愿意永久支付维护费用。

我们希望 Python 用户能够像下面这样表达:

from dask.distributed import Executor, SGECluster

c = SGECluster(nworkers=200, **options)
e = Executor(c)

… 并让这个相同的接口在不同的作业调度器之间实现标准化。

我们如何根据使用情况动态地扩展和收缩集群?

或者,我们可以有一个 24/7 运行的 dask.distributed 部署,它根据当前负载动态地扩展和收缩自身。同样,如果你想手动操作(你可以随时添加和移除 worker),这在今天完全可能实现,但我们应该为调度器添加一些信号,例如:

  • “我负载很高,请添加 worker”
  • “我空闲了一段时间,请回收 worker”

并将这些信号连接到一个与作业调度器通信的管理器。这会从用户手中移除一部分控制权,并将其交给一个策略,IT 部门可以调整该策略,以便更好地与同一网络上的其他服务协作。

用户如何管理 worker 上的软件环境?

目前,Dask 假设所有用户和 worker 共享完全相同的软件环境。有一些小型工具可以将更新的 .py.egg 文件发送给 worker,但也仅此而已。

通常,Dask 相信完整的软件环境将由其他东西处理。在传统的集群设置中,这可能是网络文件系统 (NFS) 挂载,或者可能是通过其他工具(如用于 YARN 部署的 knit 或更定制化的工具)来移动 Docker 或 conda 环境。例如,Continuum 销售专有软件 来实现这一点。

对于机构来说,设置标准的软件环境通常不是什么大问题。他们通常已经有现成的系统来处理这个问题。有趣之处在于,当用户想要使用与系统环境截然不同的环境时,例如使用 Python 2 而不是 Python 3,或者安装一个最新版本的 scikit-learn。他们可能还想在单个会话中多次更改软件环境。

我能想到的最好的解决方案是,使用 dask.distributed 网络(它擅长在网络中移动大型二进制 blob)传递完全下载好的 conda 环境,然后教 dask-worker 如何在这个环境中自举。我们应该能够在短时间内拆除并重新启动所有内容。这需要一些工作;首先是制作可重定位的 conda 二进制文件(通常没问题,但由于链接问题并非总是万无一失),然后是帮助 dask-worker 学习如何自举。

与此有点相关的是,Capital One 的 Hussain Sultan 最近贡献了一个 dask-submit 命令,用于在集群上运行脚本:https://distributed.dask.org.cn/en/latest/submitting-applications.html

分布式调度器有多安全?

Dask.distributed 非常不安全。它允许任何具有调度器网络访问权限的人在未受保护的环境中执行任意代码。数据以明文形式发送。任何恶意攻击者都可以窃取您的秘密并破坏您的集群。

然而,这完全是常态。安全性通常由管理像 Dask 这样的计算框架的其他服务来处理。

例如,我们可以依靠 Docker 来隔离 worker,防止它们破坏周围环境,并依靠网络访问控制来保护数据访问。

因为 Dask 运行在 Tornado 这个强大的网络库和 Web 框架之上,所以我们可以轻松实现一些功能,例如启用 SSL、身份验证等。但是,我犹豫是否应该只提供“一点点安全”,而不是一步到位,以免产生虚假的安全感。简而言之,如果没有大量的鼓励,我没有计划在这方面投入工作。即使如此,我仍然强烈建议机构将 Dask 与旨在实现安全的工具结合使用。我相信这对于分布式计算系统来说是普遍的做法。

Dask 对 worker 故障具有弹性,那么调度器故障呢?

Worker 可以加入和退出。客户端可以加入和退出。调度器中的状态目前是不可替代的,并且没有尝试进行备份。在这里你可以设想几种可能性:

  1. 将状态和近期事件备份到持久存储,以便在发生灾难性损失时可以恢复状态
  2. 设置一个热备故障转移节点,获取调度器执行的每一个操作的副本
  3. 让多个对等调度器同时运行,以便它们可以接替故障对等节点的工作
  4. 让客户端记住他们已经提交的内容,并在调度器重新上线时重新提交

目前,选项 4 是最可行的方式,并且已经满足了大部分需求。然而,如果 Dask 最终要在大型机构中作为关键基础设施运行,那么选项 2 或 3 可能是必需的。我们目前还没有达到那个阶段。

得益于加州大学伯克利分校/BIDS 的 Stefan van der Walt 推动的近期工作,调度器现在可以在宕机后恢复,并且所有客户端都将重新连接。正在进行的计算的状态会完全丢失,但计算基础设施保持完好,这样人们就可以重新提交作业而不会遭受重大的服务损失。

Dask 在这个话题上处理起来稍微困难一些,因为它提供了一个持久有状态的接口。对于那些从持久存储运行短暂查询、返回结果然后清除状态的分布式数据库项目来说,这个问题要容易得多。

如果 dask-worker 分布在两个不同的数据中心会发生什么?我们可以非对称地进行扩展吗?

简短的答案是不能。除了核心数和可用 RAM 之外,所有 worker 都被视为彼此相等(除非用户明确指定)。

然而,这个问题以及类似的问题最近经常出现。以下是一些类似情况的例子:

  1. 地理上分布在全国的多个数据中心
  2. 单个数据中心内的多个机架
  3. 拥有 GPU 且可以轻松地在彼此之间移动数据的多个 worker
  4. 单台机器上的多个进程

从长远来看,拥有某种分层 worker 分组机制或 worker 间优先关系的概念可能是不可避免的。与所有分布式调度问题一样,难点不在于决定这是否有用,甚至不在于提出一个合理的方案,而在于如何基于合理的方案做出万无一失且能在常数时间内操作的决策。我个人目前还没有看到一个好的方法,但预计随着更多高优先级的用例出现,会有一个方案产生。

我们如何处理多个并发用户和优先级?

这里有几个子问题:

  • 多个用户可以同时在我的集群上使用 Dask 吗?

是的,可以通过启动独立的调度器/worker 集,或者共享同一组调度器/worker。

  • 如果他们共享相同的 worker,它们会不会互相覆盖数据?

这不太可能。Dask 在命名任务时非常小心,因此两个用户提交的计算结果不同但占用内存中相同键的情况非常罕见。然而,如果他们提交的计算在某种程度上有重叠,调度器会很好地避免重复计算。当有许多人在同一硬件上进行略有不同的计算时,这会非常有用。它的工作方式与 Git 类似。

  • 如果他们共享相同的 worker,它们会不会互相占用资源?

是的,这确实可能发生。如果你担心这个问题,那么你应该给每个人分配他们自己的调度器/worker(这很容易,而且是标准做法)。Dask 目前还没有内置太多的用户管理功能。

这与 Spark 相比如何?

在机构层面,Spark 似乎主要针对 ETL + 类似数据库的计算。虽然 Dask 的模块(如 Dask.bag 和 Dask.dataframe)可以很好地在这个领域发挥作用,但这似乎并不是近期讨论的重点。

最近的交流几乎完全围绕着支持交互式自定义并行(大量小任务之间具有复杂的依赖关系),而不是数据库或 Spark 中常见的 Map->Filter->Groupby->Join 等大型抽象。这并不是说这些操作不重要;这里存在很多选择偏差。我交谈过的人是那些 Spark/数据库显然不适合他们的人。他们正在解决更复杂、更异构且用户群体更广泛的问题。

我通常用一个类比来描述这种情况,将“大数据”系统比作城市中的交通方式。我们来看看:

  • 数据库就像火车:它在预先定义好的固定站点之间运行,效率高、速度快且可预测。这些是很多人通行的流行且盈利的路线(例如商业分析)。你确实需要自己从家里到达火车站(ETL),但一旦进入数据库/火车,你会感觉非常舒适。
  • Spark 就像汽车:它能用一个工具将你从家门直接送到目的地。虽然在长途部分可能不如火车快,但在一个系统中就能完成 ETL、数据库工作和一些机器学习任务,这非常方便。
  • Dask 就像全地形车:它能带你离开城市,在那些尚未充分探索过的崎岖地面上行驶。这非常适合 Python 社区,该社区通常会对新方法进行大量探索。你也可以把你的全地形车开到城里,完全没问题,但如果你想执行成千上万的 SQL 查询,那么你可能应该投资一个合适的数据库或 Spark。

再说一次,这里存在很多选择偏差,如果你想要的是一个数据库,那么你可能应该去使用数据库。Dask 不是数据库。

这也在很大程度上简化了问题。像 Oracle 这样的数据库有大量的 ETL 和分析工具,Spark 也因其灵活性而闻名,等等。我显然偏爱 Dask。你确实不应该完全相信一个项目作者对周边工具能力的公正无偏的看法。

结论

这是对当前关于“Dask 如何演进以支持机构用例”的讨论和未决问题的粗略概述。这个情况在从大学到对冲基金等各种机构中如此普遍,确实相当令人惊讶。

上面列出的问题绝不会阻碍采用。我没有列出那一百多个回答是“是的,这已经得到很好的支持”的问题。现在我看到 Dask 正被各种机构内的个人和小团队采用。这些个人和小团队正在将这种兴趣推向更高层级。距离任何拥有 1000+ 人的组织将 Dask 作为基础设施采用还需要几个月时间,但目前势头增长的速度相当令人鼓舞。

我还要感谢那些在各种基础设施、各种规模上,针对有趣问题实践 Dask 并报告了严重 bug 的几位无名人士。这些人没有出现在 GitHub 的 issue 跟踪器上,但他们在发现 bug 方面的作用是无价的。

随着对 Dask 的兴趣增长,看看它将如何演变是很有趣的。在文化上,Dask 成功地同时满足了开放科学界和私营部门的需求。该项目从双方都获得了资金支持和开源贡献。到目前为止,还没有出现任何利益冲突(大家都在大致相同的方向努力),我认为这对所有参与者来说都是一次非常富有成果的经历。


博客评论由 Disqus 提供支持