配置分布式 Dask 集群 新手指南
作者:Laura Lorenz (Prefect),Julia Signell (Saturn Cloud)
配置 Dask 集群一开始可能令人望而生畏,但好消息是 Dask 项目内置了许多启发式算法,它们会尽力根据部署机器和接收的工作来预测并适应您的工作负载。很可能在很长一段时间内,您甚至根本不需要进行任何特殊配置。话虽如此,如果您正寻求一些技巧来摆脱在本地使用 Dask,或者您已准备好通过更深入的配置来优化现有的 Dask 集群,这些提示和技巧将为您提供指导,并引导您找到关于此主题的最佳 Dask 文档!
如何托管分布式 Dask 集群
对我来说,最大的飞跃是从开发过程中一次只运行一小时左右的本地版 Dask,转变为搭建生产就绪的 Dask 版本。大致有两种风格
- 静态 Dask 集群 – 始终开启、始终在线、随时准备接受工作
- 临时性 Dask 集群 – 可以通过 Python API 轻松启动或关闭,启动后会运行一个最小的 Dask 主节点,该节点只在实际提交工作时才启动 Dask worker
虽然这是两大主要类别,但实际实现方法有无数种选择。这取决于许多因素,包括您想使用哪些云服务提供商产品,这些资源是否已为您预配,以及您是想使用 Python API 还是其他部署工具来实际启动 Dask 进程。Dask 文档的 Setup 部分非常详尽地列出了所有可能的 Dask 集群供应方式。简单介绍一下这些文档中描述的内容,您可以选择
- 在您预配的云实例(如 AWS EC2 或 GCP GCE)上,通过 CLI 手动安装并启动 Dask 进程
- 使用流行的部署接口,例如 用于 kubernetes 的 helm,在您预配的云容器集群(如 AWS Fargate 或 GCP GKE)中部署 Dask
- 使用 Dask 开发者提供的“原生”部署 Python API,在其支持的部署基础设施上创建(并交互式配置)Dask,可以通过支持多种后端的通用 Dask Gateway,或直接对接集群管理器,如通过 dask-kubernetes 对接 kubernetes 或通过 dask-yarn 对接 YARN,前提是您已经预配好了 kubernetes 集群或 hadoop 集群
- 使用一个提供近乎全套服务的部署 Python API,称为 Dask Cloud Provider,它会更进一步,为您供应集群,只需您提供 AWS 凭据即可(截至撰写本文时,它仅支持 AWS)
正如您所见,有很多选择。除此之外,您还可以与托管服务提供商合作,让他们根据您的规范为您供应和配置 Dask 集群,例如 Saturn Cloud(免责声明:其中一位作者 (Julia Signell) 在 Saturn Cloud 工作)。
无论您选择哪种方式,重点都是释放 Dask 提供的 Python 并行计算能力,并以尽可能可扩展的方式实现,这正是将其运行在分布式基础设施上的意义所在。一旦您确定了将在何处以及使用何种 API 部署您的 Dask 集群,针对您的 Dask 集群及其工作负载的实际配置过程便开始了。
如何为集群选择实例类型
当您准备好为生产环境搭建 Dask 集群时,您需要对 scheduler 和 worker 运行所依赖的基础设施做出一些决策,特别是如果您使用了 如何托管分布式 Dask 集群 中需要预配基础设施的选项。无论您的基础设施是本地部署(on-prem)还是在云端(in the cloud),都需要做出经典的决策点
- 内存需求
- CPU 需求
- 存储需求
如果您已在本地测试了您的工作负载,一个简单的启发式方法是,将您的工作所需的 CPU、存储和内存使用量乘以一个倍数,该倍数与您的本地实验相对于预期生产使用的缩减程度相关。例如,如果您在本地使用 10% 的数据样本测试工作负载,将观测到的任何资源使用量乘以至少 10,可能就会接近您的最小实例大小。尽管实际上 Dask 的许多底层优化意味着它通常不需要资源的线性增长来处理更多数据,但这个简单的启发式方法可以作为一个不错的初步尝试技术,为您提供一个起点。
同样地,选择最小的实例,使用预定的数据子集运行,然后逐步扩展直到有效运行,这也能为您找到最小实例大小提供线索。如果您的本地环境算力不足,无法使用 10% 或更多的源数据在本地运行您的流程,或者如果本地环境差异很大(例如操作系统不同,或者有许多竞争性应用程序在后台运行),又或者如果使用本地机器监视流程执行的 CPU、内存和存储很困难或令人烦恼,那么在最小的可用节点上隔离测试用例是一个更好的选择。
另一方面,选择您可以承受的最大实例,观察 CPU/内存/存储最大指标与基于未使用资源比例缩减之间的差异,这可能是一种更快找到理想大小的方法。
您最终确定的节点大小可能很大程度上取决于您的预算,但只要您的节点大小足够大,能够避免严格的内存不足错误,那么使用接近最小运行规格的节点所付出的代价就是时间。鉴于 Dask 集群的目的是运行分布式并行计算,如果您扩大实例以实现更多并行性,可以显著节省时间。如果您的模型运行时间很长,训练需要数小时,而您可以将其缩短至几分钟,从而节省您自己或员工的时间,以便快速看到反馈循环,那么超出最小规格进行扩展是值得的。
您的 scheduler 节点和 worker 节点应该大小相同吗?配置不同实例大小来优化资源当然很诱人。快速深入了解它们各自的一般资源需求是很有价值的。
对于 scheduler 来说,提交给它的每个任务的序列化版本会保存在内存中,直到它确定哪个 worker 应该接收该工作。这与实际执行任务所需的内存量不一定相同,但在这里吝啬内存可能会阻止工作被调度。从 CPU 的角度看,scheduler 的需求可能远低于 worker,但如果 scheduler 缺少 CPU 会导致死锁,而当 scheduler 卡住或死亡时,您的 worker 也无法获得任何工作。在存储方面,Dask scheduler 不会将太多内容持久化到磁盘,即使是临时性的,因此其存储需求非常低。
对于 worker 来说,您的任务代码的特定资源需求可能会超过我们能做出的任何通用概括。至少,它们需要足够的内存和 CPU 来反序列化每个任务负载,并再次将其序列化,作为 Future 返回给 Dask scheduler。Dask worker 可能会将计算结果持久化到内存中,包括分布式存储在集群的内存中,您可以在这里阅读更多相关信息。关于存储需求,从根本上说,提交给 Dask worker 的任务不应写入本地存储——scheduler 不保证工作会在某个特定的 worker 上运行——因此存储成本应直接与 worker 依赖项的安装占用空间以及 Dask worker 的任何临时存储相关。worker 创建的临时文件可能包括在内存不足时将内存数据溢出到本地磁盘,只要该行为未被禁用,这意味着减少内存可能会影响您的临时存储需求。
通常我们建议简化流程,将 scheduler 和 worker 节点设置为相同的节点大小,但如果您想优化它们,可以使用上述 CPU、内存和存储模式作为单独配置它们的起点。
如何选择 worker 数量
每个 Dask 集群都有一个 scheduler 和任意数量的 worker。scheduler 跟踪需要完成的工作和已完成的工作。worker 执行工作,相互之间共享结果,并向 scheduler 报告。关于这方面的更多背景信息可在 dask.distributed 文档中找到。
搭建 Dask 集群时,您必须决定使用多少个 worker。使用大量 worker 可能很诱人,但这并非总是好事。如果您使用太多 worker,有些 worker 可能没有足够的工作可做,大部分时间处于空闲状态。即使它们有足够的工作,它们之间可能需要共享数据,这会很慢。此外,如果您的机器资源有限(而不是每个 worker 一个节点),那么每个 worker 的能力会较弱——它们可能内存不足,或者完成一个任务需要很长时间。
另一方面,如果您使用的 worker 太少,就无法充分利用 Dask 的并行性,您的工作整体完成时间可能会更长。
在决定使用多少个 worker 之前,请先尝试使用默认设置。在许多情况下,Dask 可以选择一个充分利用您的机器大小和形状的默认设置。如果默认设置不起作用,那么您需要了解一些关于您的工作大小和形状的信息。特别是您需要知道
- 您的计算机大小是多少?或者您可以访问哪些类型的计算节点?
- 您的数据有多大?
- 您尝试进行的计算结构是什么?
如果您在本地机器上工作,计算机的大小是固定的且已知。如果您在 HPC 或云实例上工作,那么您可以选择分配给每个 worker 的资源。您根据我们在 如何为集群选择实例类型 中讨论的因素来决定集群的大小。
Dask 通常用于数据量过大无法完全载入内存的情况。在这些情况下,数据被分割成 chunk 或 partition。每个任务都在一个 chunk 上计算,然后对结果进行聚合。您将在下面了解如何改变数据的形状。
计算的结构可能是最难理解的。如果可能的话,尝试在数据的一个非常小的子集上运行计算会很有帮助。您可以通过调用 .visualize()
查看特定计算的任务图。如果任务图太大,无法在内联舒适地查看,那么可以查看 Dask dashboard 的图形标签页。它会显示任务图的运行过程,并点亮每个部分。为了让 Dask 最有效率,您需要一个不太大或相互连接不太紧密的任务图。Dask 文档讨论了几种优化任务图的技术。
要选择使用的 worker 数量,请考虑在任务图的任何给定部分有多少并发任务正在运行。如果每个任务都包含非平凡(non-trivial)的工作量,那么运行 Dask 最快的方法是为每个并发任务配备一个 worker。对于分块的数据,如果每个 worker 都能轻松地将一个数据块载入内存并在其上进行计算,那么块的数量应该是 worker 数量的倍数。这确保了 worker 总是有足够的工作可做。
如果您的任务数量变化很大,那么您也可以考虑使用自适应集群。在自适应集群中,您可以设置 worker 的最小和最大数量,并让集群根据需要添加和移除 worker。当 scheduler 确定某些 worker 不再需要时,它会要求集群将其关闭;当需要更多 worker 时,scheduler 会要求集群启动更多。这对于任务图很有效,比如任务图开始时输入任务较少,中间任务增多,最后是一些聚合或归约操作。
一旦您启动了一些 worker 的集群,您就可以在 Dask dashboard 中监控它们的进度。您可以在那里检查它们的内存消耗,观察它们在任务图中的进度,并访问 worker 级别的日志。以这种方式观察您的计算,可以深入了解潜在的提速机会,并培养关于未来使用 worker 数量的直觉。
选择 worker 数量的棘手之处在于,实际上您的机器、数据和任务图的大小和形状可能会发生变化。弄清楚要使用多少个 worker 可能感觉就像没完没了地调整旋钮。如果这让您感到烦躁,请记住,即使集群正在运行,您也可以随时更改 worker 的数量。
如何选择 nthreads 以利用多线程
在启动 Dask worker 本身时,有两个非常重要的配置选项需要相互权衡:worker 数量和每个 worker 的线程数。实际上,您可以在同一个 worker 进程上通过 flag 来控制两者,例如使用 dask-worker --nprocs 2 --nthreads 2
的形式,不过 --nprocs
只是在后台启动另一个 worker,因此更清晰的配置方式是避免设置 --nprocs
,而是使用您用于指定 worker 总数的方式来控制该配置。我们已经讨论了如何选择 worker 数量,但如果您改变 worker 的 --nthreads
设置以增加单个 worker 可以完成的工作量,您可能会修改之前的决定。
在决定 worker 的最佳 nthreads
数量时,关键在于您期望这些 worker 完成什么类型的工作。基本原则是,多个线程最适合在任务之间共享数据,但如果运行的代码不释放 Python 的 GIL(“全局解释器锁”),效果就会很差。对于不释放 Python GIL 的工作,增加 nthreads
没有效果;如果 GIL 被锁定,worker 无法使用线程来优化计算速度。这对于希望增加并行性,但却看不到增加 worker 线程限制带来任何收益的新 Dask 用户来说,可能是一个困惑点。
如 Dask worker 文档中所述,有一些经验法则可以判断何时需要担心 GIL 锁定问题,从而优先选择更多 worker,而不是使用高 nthreads
的单个重量级 worker:
- 如果您的代码主要处理非数值数据,并且大部分是纯 Python 代码(使用非优化的 Python 库)
- 如果您的代码导致 Python 外部的计算长时间运行且没有显式释放 GIL
方便的是,许多 Dask 用户专门使用针对多线程优化的 Python 库进行数值计算,即 PyData 生态系统中的 NumPy、Pandas、SciPy 等。如果您主要使用这些或类似优化的库进行数值计算,则应侧重于更高的线程数。如果您确实主要进行数值计算,您可以指定的总线程数可以等于您的核心数;如果您正在进行任何可能导致线程暂停的工作,例如任何 I/O 操作(比如将结果写入磁盘),您可以指定更多的线程,因为有些线程会偶尔处于空闲状态。在这种情况下,设置比核心数多多少线程的理想数量很难估计,并且取决于您的工作负载,但参考 concurrent.futures 中的一些建议,对于严重依赖 I/O 的工作负载,总线程数的历史上限是机器处理器数量的 5 倍。
如何对数组进行分块和对 DataFrames 进行分区
在 Dask 中有很多不同的触发工作的方法。例如:您可以使用 delayed 包装函数,或直接向 client 提交工作(选项比较请参阅用户界面)。如果您将结构化数据加载到 Dask 对象中,那么您很可能在使用 dask.array 或 dask.dataframe。这些模块分别模仿 numpy 和 pandas——使得处理大型数组和大型表格数据集更加容易。
使用 dask.dataframe 和 dask.array 时,计算通过将数据分割成块来分配给 worker。在 dask.dataframe 中,这些块称为分区 (partitions),在 dask.array 中称为块 (chunks),但原理是相同的。在 dask.array 的情况下,每个块包含一个 numpy 数组,而在 dask.dataframe 的情况下,每个分区包含一个 pandas dataframe。无论哪种方式,每个块/分区都包含数据的一小部分,但它代表了整体,并且必须足够小以轻松地放入 worker 内存。
通常在载入数据时,分区/块会自动确定。例如,从包含许多 csv 文件的目录中读取时,每个文件将成为一个分区。如果您的数据默认没有分割,则可以使用 df.set_index 或 array.rechunk 手动进行分割。如果它们默认已分割,但您想更改块的形状,文件级别的块应该是 Dask 级别块的倍数(在这里阅读更多相关信息)。
作为用户,您了解数据将如何使用,因此通常可以以有助于提高计算效率的方式进行分区。例如,如果您要按月进行聚合,沿着时间轴分块可能是有意义的。如果改为查看不同高度的特定特征,沿着高度轴分块可能更有意义。关于 dask.arrays 分块的更多技巧在最佳实践中有所描述。另一个可能需要重新分区的情况是,如果您将数据过滤到原始数据的一个子集。在这种情况下,您的分区可能会太小。有关如何处理这种情况的更多详细信息,请参阅 dask.dataframe 最佳实践。
在选择块大小时,最好不要太小,也不要太大(通常 100MB 左右是合理的)。每个块需要能够放入 worker 内存,并且在该块上的操作应该花费非平凡的时间(超过 100ms)。要获取更多建议,请查看关于块和分区的文档。
我们希望这能帮助您决定是否以不同方式配置您的 Dask 部署,并给您尝试的信心。我们在 Dask 文档中找到了所有这些重要的信息,所以如果您感到启发,请点击我们贯穿全文提供的链接,了解更多关于 Dask 的内容!
由 Disqus 提供支持的博客评论