Dask-jobqueue 轻松在 PBS, Slurm, MOAB, SGE, 和 LSF 等作业排队系统中部署 Dask。
作者:Joe Hamman
这项工作是与 Matthew Rocklin (Anaconda), Jim Edwards (NCAR), Guillaume Eynard-Bontemps (CNES) 和 Loïc Estève (INRIA) 合作完成的,部分得到美国国家科学基金会 Earth Cube 项目的支持。dask-jobqueue 包是 Pangeo 项目的一个衍生品。这篇博文曾 发布于此
TLDR; Dask-jobqueue 让你能够轻松地在采用各种作业排队系统(如 PBS、Slurm、SGE 或 LSF)的 HPC 集群上部署 dask。Dask-jobqueue 提供了一个 Pythonic 用户界面,通过在 HPC 系统上提交、执行和删除单个作业来管理 dask worker/集群。它赋予用户在大型 HPC 系统上交互式地扩展工作负载的能力;将交互式 Jupyter Notebook 变成一个强大的工具,用于对超大型数据集进行可伸缩计算。
安装方式:
conda install -c conda-forge dask-jobqueue
或者
pip install dask-jobqueue
并查阅 dask-jobqueue 文档:https://jobqueue.dask.org.cn
介绍
大型高性能计算机(HPC)集群在整个计算科学领域无处不在。这些 HPC 系统包含强大的硬件,包括许多大型计算节点、高速互连和并行文件系统。我们在 NCAR 使用的一个此类系统的例子名为 Cheyenne。Cheyenne 是一台相当大的机器,拥有约 15 万个核心和超过 300 TB 的总内存。
Cheyenne 是由 NCAR 运营的一台 5.34 petaflops 的高性能计算机。
这些系统经常使用作业排队系统(如 PBS、Slurm 或 SGE)来管理众多用户提交的许多并发作业的排队和执行。“作业”是指程序在用户 HPC 系统上的某些资源集上的一次单独执行。这些作业通常通过命令行提交:
qsub do_thing_a.sh
其中 do_thing_a.sh 是一个 shell 脚本,可能看起来像这样:
#!/bin/bash
#PBS -N thing_a
#PBS -q premium
#PBS -A 123456789
#PBS -l select=1:ncpus=36:mem=109G
echo “doing thing A”
在这个例子中,“-N”指定了此作业的名称,“-q”指定了应运行作业的队列,“-A”指定了一个用于计费作业运行时所用 CPU 时间的项目代码,“-l”指定了此作业的硬件规格。每个作业排队系统在配置和提交这些作业时语法略有不同。
这种接口导致了几种常见工作流程模式的发展:
-
如果你想进行扩展,请使用 MPI。MPI 代表消息传递接口(Message Passing Interface)。它是一种广泛采用的接口,允许在传统 HPC 集群上进行并行计算。许多大型计算模型使用 C 和 Fortran 等语言编写,并使用 MPI 来管理其并行执行。对于经验丰富的人来说,这是扩展复杂计算的首选解决方案。
-
批量处理。科学处理流程通常包含一些步骤,这些步骤可以通过并行提交多个作业轻松实现并行化。也许你想用稍微不同的输入运行“do_thing_a.sh”500 次——很简单,只需单独提交所有作业(或者在某些排队系统中被称为“数组作业”)。
-
串行也行。现在的电脑速度很快,不是吗?也许你根本不需要并行化你的程序。好吧,那就保持串行,并在作业运行时去喝杯咖啡。
问题
上述任何一种工作流程模式都无法对超大数据进行交互式分析。当我原型化新的处理方法时,我通常希望交互式工作,比如在 Jupyter Notebook 中。即时编写 MPI 代码既困难又耗时,批量作业本质上不是交互式的,而当我开始处理 TB 级的数据时,串行方式根本无法胜任。我们的经验是,这些工作流程通常相当不优雅,并且难以在应用程序之间迁移,导致在此过程中产生大量重复劳动。
Pangeo 项目的目标之一是促进对超大数据集进行交互式处理。Pangeo 利用 Jupyter 和 dask,以及一些更领域特定的包(如 xarray)来实现这一点。问题在于我们没有一种特别令人满意的在 HPC 集群上部署 dask 的方法。
系统
-
Jupyter Notebooks 是支持交互式代码执行、图形和动画显示以及内嵌解释性文本和公式的 Web 应用程序。它们正迅速成为 Python 中交互式计算的标准开源格式。
-
Dask 是一个用于并行计算的库,与 Python 现有的科学软件生态系统(包括 NumPy、Pandas、Scikit-Learn 和 xarray 等库)很好地协调。在许多情况下,它让用户能够将现有工作流程快速扩展到更大的应用。*Dask-distributed* 是 dask 的一个扩展,有助于在多台计算机上并行执行。
-
Dask-jobqueue 是我们构建的一个新的 Python 包,旨在促进 dask 在 HPC 集群上的部署以及与多种作业排队系统的接口。它的用法简洁且具有 Pythonic 风格。
from dask_jobqueue import PBSCluster
from dask.distributed import Client
cluster = PBSCluster(cores=36,
memory="108GB",
queue="premium")
cluster.scale(10)
client = Client(cluster)
幕后发生了什么?
-
在调用 PBSCluster() 时,我们告诉 dask-jobqueue 如何配置每个作业。在这种情况下,我们将每个作业设置为拥有 1 个 Worker,每个 Worker 使用 36 个核心(线程)和 108 GB 内存。我们还告诉 PBS 排队系统,我们希望将此作业提交到“premium”队列。此步骤还会启动一个 Scheduler 来管理稍后将添加的 worker。
-
直到我们调用 cluster.scale() 方法,我们才与 PBS 系统交互。在这里,我们启动 10 个 worker,或等效地启动 10 个 PBS 作业。对于每个作业,dask-jobqueue 会创建一个类似于上面所示的 shell 命令(只是调用的是 dask-worker 而不是 echo),并通过子进程调用提交作业。
-
最后,我们通过实例化 Client 类连接到集群。从这里开始,我们代码的其余部分看起来就像我们使用 dask 的本地调度器 一样。
Dask-jobqueue 易于定制,帮助用户利用高级 HPC 功能。一个更复杂的、可以在 NCAR 的 Cheyenne 超级计算机上运行的例子是:
cluster = PBSCluster(cores=36,
processes=18,
memory="108GB",
project='P48500028',
queue='premium',
resource_spec='select=1:ncpus=36:mem=109G',
walltime='02:00:00',
interface='ib0',
local_directory='$TMPDIR')
在这个例子中,我们指示 PBSCluster:1) 每个作业最多使用 36 个核心,2) 每个作业使用 18 个 worker 进程,3) 使用每个拥有 109 GB 大内存的节点,4) 使用比标准更长的运行时间,5) 使用 InfiniBand 网络接口 (ib0),以及 6) 使用快速 SSD 磁盘作为其本地目录空间。
最后,Dask 提供根据一组启发式规则“自动伸缩”集群的能力。当集群需要更多 CPU 或内存时,它会进行扩展。当集群有未使用的资源时,它会缩减。Dask-jobqueue 通过一个简单的接口支持此功能:
cluster.adapt(minimum=18, maximum=360)
在此示例中,我们告诉集群在 18 到 360 个 worker(或 1 到 20 个作业)之间自动伸缩。
演示
我们整理了一个相当全面的屏幕录像,引导用户完成在 HPC 集群上设置 Jupyter 和 Dask(以及 dask-jobqueue)的所有步骤:
结论
Dask jobqueue 使得在 HPC 集群上部署 Dask 变得更加容易。该包为常见的作业排队系统提供了 Pythonic 风格的接口。它也易于定制。
自动伸缩功能提供了一种在 HPC 集群上进行科学研究的根本不同方式。启动您的 Jupyter Notebook,实例化您的 dask 集群,然后进行科学计算——让 dask 根据计算需求决定何时扩展和缩减。我们认为这种交互式并行计算的爆发式方法提供了许多好处。
最后,在开发 dask-jobqueue 的过程中,我们遇到了一些值得一提的挑战。
-
排队系统是高度可定制的。系统管理员似乎对其各自的排队系统实现方式拥有很大控制权。实际上,这意味着很难同时覆盖特定排队系统的所有变体。我们普遍发现,事情似乎足够灵活,对于不灵活的情况,我们欢迎反馈意见。
-
CI 测试需要相当多的设置工作。使用 dask-jobqueue 的目标环境是现有的 HPC 集群。为了方便对 dask-jobqueue 进行持续集成测试,我们必须配置多个排队系统(PBS、Slurm、SGE),使其使用 Travis CI 在 docker 中运行。这是一项艰苦的任务,我们仍在努力完善。
-
我们构建 dask-jobqueue 以在 dask-deploy 框架中运行。如果您熟悉 dask-kubernetes 或 dask-yarn,您也会认出 dask-jobqueue 中的基本语法。这些 dask 部署包的同时开发最近引发了一些重要的协调讨论(例如 https://github.com/dask/distributed/issues/2235)。
博客评论由 Disqus 提供支持