本周是 2021 年 Dask 峰会,我们在会上举办的其中一个研讨会涵盖了 Dask Distributed 的多种部署选项。

我们介绍了本地部署、SSH、Hadoop、Kubernetes、云和托管服务,但一个多次被问到的问题是:“我应该从哪里开始?”。

我想分享许多 Dask 用户所经历的历程,希望你也许能在这条路上找到自己的位置,这可能会为你指明下一步的方向。

起初

作为一名 Dask 新用户,你很可能正在学习文档教程

我们通常很早就介绍了分布式调度器的概念,但你并不需要它来获得 Dask 的初步好处。对于大于内存的数据集,从 Pandas 切换到 Dask 是一个常见的入门点,并且使用默认的线程调度器也能运行得非常好。

# Switching from this
import pandas as pd
df = pd.read_csv('/data/.../2018-*-*.csv')
df.groupby(df.account_id).balance.sum()

# To this
import dask.dataframe as dd
df = dd.read_csv('/data/.../2018-*-*.csv')
df.groupby(df.account_id).balance.sum().compute()

但当你阅读了几页文档后,你就已经被鼓励创建 Client()LocalCluster() 对象了。

注意:当你创建不带任何参数/配置的 Client() 时,Dask 会在底层为你启动一个 LocalCluster() 对象。所以通常 Client() 等同于 Client(LocalCluster())

这是用户经常停留的一个阶段,他们会启动一个本地分布式调度器,并在本地机器上最大化利用资源来完成工作。

from dask.distributed import Client
import dask.dataframe as dd

client = Client()

df = dd.read_csv('/data/.../2018-*-*.csv')
df.groupby(df.account_id).balance.sum().compute()

摆脱单机限制

一旦你熟悉了任务图和工作调度,你可能就会开始思考如何将计算扩展到本地机器之外。

我们的代码实际上不需要做太多改动,我们已经连接了一个客户端并在进行 Dask 工作,我们只需要更多具有相同用户环境、数据等的联网机器。

我个人曾在一个组织工作,那里每位研究人员的书桌下都有一台 Linux 台式机。这些机器连接在局域网 (LAN) 上,并通过 Active Directory 管理,用户主目录存储在存储服务器上。这意味着你可以在任何一台机器前坐下并登录,获得一致的体验。这也意味着你可以通过 SSH 连接到网络上的另一台机器,并且你的主目录也会在那里,包含你的所有文件,包括你的数据和 conda 环境。

这在许多组织中是一种常见的设置,而且很容易让你想通过 SSH 连接到那些可能没有充分利用自己机器的同事的机器上,并在那里运行你的工作。当然,我肯定你事先问过他们了!

组织也可能有机架式服务器专用于计算用途,设置会很相似。你可以通过 SSH 连接它们,并通过网络存储访问主目录和数据。

使用 Dask Distributed,你可以开始使用 SSHCluster 将你的工作负载扩展到这些机器上。你只需要设置好你的 SSH 密钥,以便无需密码即可登录这些机器。

from dask.distributed import Client, SSHCluster
import dask.dataframe as dd

cluster = SSHCluster(
    [
        "localhost",
        "alices-desktop.lan",
        "bobs-desktop.lan",
        "team-server.lan",
    ]
)
client = Client(cluster)

df = dd.read_csv("/data/.../2018-*-*.csv")
df.groupby(df.account_id).balance.sum().compute()

现在,相同的工作负载可以在我们这个小型临时集群的所有 CPU 上运行,利用所有内存,并从同一个共享存储中拉取数据。

迁移到计算平台

使用(甚至滥用)台式机和共享服务器等硬件可以让你走得相当远,但这很可能会让你的 IT 团队感到沮丧。

拥有许多用户尝试执行大型计算工作负载的组织很可能会考虑或已经拥有某种专门用于运行这些工作的平台。

你的组织拥有的平台是许多多少有些随意的技术选择的结果。你的公司使用哪些编程语言?采购时供应商提供了哪些优惠?当前的 IT 人员具备哪些技能?你的 CTO 在选择供应商的那天早餐吃了什么?

我并不是说这些决定是轻率做出的,但考虑的标准往往与你最终如何使用这些资源是正交的(即不相关的)。在 Dask,我们支持你的组织做出的任何平台决策。我们努力为尽可能多的流行平台构建部署工具,包括:

作为组织内的用户,你可能已经加入了其中一个平台。你可能已经获得了一些凭证,并接受了关于如何在上面启动作业的简单培训。

上面列出的 dask-foo 工具旨在构建在这些平台之上,并代表你提交作业,就像它们是独立的计算作业一样。但我们并不是向平台提交一个 Python 脚本,而是提交 Dask 调度器和工作器,然后连接到它们以利用已分配的资源。这是集群之上的集群。

通过这种方法,你的 IT 团队可以完全控制计算资源。他们可以通过配额和队列确保每个人获得公平的份额。但作为用户,你获得的 Dask 体验与你在本地机器上习惯的体验是相同的。

然而,在这些平台上,你的数据可能存储在略有不同的位置。例如,你可能在云上,而你的数据存储在对象存储中。幸好,我们有基于 fsspec 构建的工具,如 s3fsadlfs,我们可以以几乎相同的方式读取这些数据。所以,你的工作流程仍然没有太大变化。

from dask.distributed import Client
from dask_cloudprovider.azure import AzureVMCluster
import dask.dataframe as dd

cluster = AzureVMCluster(resource_group="<resource group>",
                         vnet="<vnet>",
                         security_group="<security group>",
                         n_workers=10)
client = Client(cluster)

df = dd.read_csv("adl://.../2018-*-*.csv")
df.groupby(df.account_id).balance.sum().compute()

集中管理 Dask 资源

当你的组织有足够多的用户采用和使用 Dask 时,可能就该是你的 IT 团队介入并为你提供托管服务的时候了。让许多用户以各种方式提交许多临时集群,可能不如由 IT 集中管理且更重要的是由 IT 指定的服务高效。

迁移到托管服务的动机通常是由组织层面驱动的,而不是个人。一旦你达到了这个阶段的 Dask 使用水平,你可能已经对你的工作流程感到非常满意,并且改变它们可能会带来不便。然而,为了达到这个阶段你可能已经积累了相当多的 Dask 部署知识,并且随着 Dask 在你组织内的使用量增长,指望每个人都达到同样的水平是不切实际的。

归根结底,部署分布式系统的专家可能并不在你的职位描述中,你可能有更重要的事情要做,比如数据科学、金融、物理、生物学,或者 Dask 正在帮助你完成的任何其他工作。

你也可能会感受到来自 IT 的一些压力。你正在运行集群之上的集群,对他们来说,你的 Dask 集群是一个黑箱,这可能会让他们感到不安,因为他们是负责这些硬件的人。感觉受到 IT 团队的限制是很常见的,我知道这一点,因为我曾是一名系统管理员,过去也限制过别人。但是你的 IT 团队的动机是好的,他们试图为组织节省资金,最佳地利用有限的资源,并最终将 IT 从你的工作中移除,这样你就可以专注于你的本职工作。所以,积极面对这一切,与他们沟通,分享你的 Dask 知识,并主动成为他们最终构建的任何解决方案的试点用户。

你可以推荐他们采用的一种方法是部署 Dask Gateway。这可以由管理员部署,并提供一个中央枢纽,代表用户启动 Dask 集群。它支持多种认证方式,因此可以与你的组织使用的任何系统集成,并支持许多与独立工具相同的后端计算平台,包括 Kubernetes、Hadoop 和 HPC。

这将使他们能够确保集群之间的安全设置正确且一致。如果你正在使用容器,他们可能会希望你使用一些定期更新和进行漏洞扫描的官方镜像。这也可能让他们更深入地了解人们正在运行的工作负载类型,并更准确地规划未来的系统。通过使用 Dask Gateway,这些控制和责任就转移到了他们那边。

用户需要向网关进行身份验证,但之后就可以以与平台无关的方式启动 Dask 集群。

from dask.distributed import Client
from dask_gateway import Gateway
import dask.dataframe as dd

gateway = Gateway(
    address="http://daskgateway.myorg.com",
    auth="kerberos"
)
cluster = gateway.new_cluster()
client = Client(cluster)

df = dd.read_csv("/data/.../2018-*-*.csv")
df.groupby(df.account_id).balance.sum().compute()

同样,读取你的数据需要一些关于数据存储在网关正在使用的底层计算平台上的方式的知识,但所需的改变是微乎其微的。

托管服务

如果你的组织太小,没有 IT 团队来为你管理这些,或者你只是偏爱托管服务,那么现在有一些初创公司正在提供这项服务,包括 CoiledSaturn Cloud

未来平台

如今,大型云供应商拥有托管的数据科学平台,包括 AWS SagemakerAzure Machine LearningGoogle Cloud AI Platform。但这些平台目前不包含 Dask 即服务(Dask as a service)。

这些云服务目前主要专注于批处理和机器学习,但这些云平台也提供了 Spark 和其他计算集群的托管服务。随着 Dask 越来越受欢迎,如果这些云供应商在未来几年发布托管的 Dask 服务,我不会感到惊讶。

总结

Dask 最强大的特性之一是,无论分布式计算集群的规模或复杂性如何,你的代码都可以保持基本不变。它可以轻松地从单机扩展到数千台服务器。

但扩展规模需要用户和组织的共同成长,而人们似乎已经在这一过程中走上了一条共同的道路。

希望这篇文章能让你了解自己在这条道路上处于哪个位置,以及下一步该走向何方。无论你是社区新手,正在探索多核计算的力量,还是经验丰富的老手,正试图管理数百名热爱 Dask 的用户,祝你好运!


博客评论由 Disqus 提供