Dask 在传统 HPC 机器上的部署频率越来越高。在过去的一周里,我亲自帮助了四个不同的团队完成设置。这是一个令人惊讶的个性化过程,因为每台 HPC 机器都有自己的特点。每台机器都使用作业调度程序,如 SLURM/PBS/SGE/LSF/…,网络文件系统和快速互连,但这些子系统在不同机器上都有略微不同的策略,这正是问题棘手之处。

通常情况下,如果我们同时具备以下两点,我们可以在大约 30 分钟内解决这些问题:

  • 熟悉机器的人,例如高级用户或 IT 管理员
  • 熟悉 Dask 设置的人

这些系统的规模范围很广。本周我在这个规模范围的不同两端都看到了:

  • 生物成像实验室内部用于研究工作的小型内部 24 节点 SLURM 集群
  • Summit,世界上最强大的超级计算机

在这篇文章中,我将分享一些我在处理 Summit 时遇到的情况,那特别令人头疼。希望这能让大家了解可能出现的情况。这些技巧可能不适用于您的特定系统,但希望它们能让您体会到哪些地方可能出错,以及我们追踪问题的方法。

Power 架构

首先,Summit 是一台 IBM PowerPC 机器,这意味着在普通 Intel 芯片上编译的软件包将无法工作。幸运的是,Anaconda 提供了一个适用于 Power 架构的分发版本下载,这为我提供了一个很好的起点。

https://anaconda.net.cn/distribution/#linux

包看起来比普通发行版要旧几个月,但这我可以接受。

安装 Dask-Jobqueue 并配置基本信息

我们需要告诉 Dask 每台机器上有多少个核心和多少内存。这个过程相当简单,在 jobqueue.dask.org 上有详细的文档和一个信息丰富的截屏视频,甚至可以通过错误消息进行自我引导。

In [1]: from dask_jobqueue import PBSCluster
In [2]: cluster = PBSCluster()
ValueError: You must specify how many cores to use per job like ``cores=8``

我暂时跳过这一节,因为一般来说,新手用户可以处理这个问题。更多信息,请考虑观看这个 YouTube 视频(30 分钟)。

作业脚本中的无效操作

所以我们创建一个包含所有信息的集群对象,调用 .scale,然后从作业调度程序那里收到一些错误消息。

from dask_jobqueue import LSFCluster
cluster = LSFCluster(
    cores=128,
    memory="600 GB",
    project="GEN119",
    walltime="00:30",
)
cluster.scale(3)  # ask for three nodes
Command:
bsub /tmp/tmp4874eufw.sh
stdout:

Typical usage:
  bsub [LSF arguments] jobscript
  bsub [LSF arguments] -Is $SHELL
  bsub -h[elp] [options]
  bsub -V

NOTES:
 * All jobs must specify a walltime (-W) and project id (-P)
 * Standard jobs must specify a node count (-nnodes) or -ln_slots. These jobs cannot specify a resource string (-R).
 * Expert mode jobs (-csm y) must specify a resource string and cannot specify -nnodes or -ln_slots.

stderr:
ERROR: Resource strings (-R) are not supported in easy mode. Please resubmit without a resource string.
ERROR: -n is no longer supported. Please request nodes with -nnodes.
ERROR: No nodes requested. Please request nodes with -nnodes.

Dask-Jobqueue 试图根据您提供的输入生成一个合理的作业脚本,但是您使用的资源管理器可能有该集群特有的附加策略。我们通过查看生成的脚本,并将其与已知在 HPC 机器上工作的脚本进行比较来调试此问题。

print(cluster.job_script())
#!/usr/bin/env bash

#BSUB -J dask-worker
#BSUB -P GEN119
#BSUB -n 128
#BSUB -R "span[hosts=1]"
#BSUB -M 600000
#BSUB -W 00:30
JOB_ID=${LSB_JOBID%.*}

/ccs/home/mrocklin/anaconda/bin/python -m distributed.cli.dask_worker tcp://scheduler:8786 --nthreads 16 --nprocs 8 --memory-limit 75.00GB --name name --nanny --death-timeout 60 --interface ib0 --interface ib0

在与已知在 Summit 上工作的现有脚本进行比较后,我们修改关键字以在头文件中添加和删除某些行。

cluster = LSFCluster(
    cores=128,
    memory="500 GB",
    project="GEN119",
    walltime="00:30",
    job_extra=["-nnodes 1"],          # <--- new!
    header_skip=["-R", "-n ", "-M"],  # <--- new!
)

当我们调用 scale 时,这似乎让 LSF 满意了。它不再输出大量错误消息。

>>> cluster.scale(3)  # things seem to pass
>>>

Worker 无法连接到 Scheduler

所以从 LSF 的角度来看,一切似乎都很好,但是当我们客户端连接到我们的集群时,看不到任何东西到达。

>>> from dask.distributed import Client
>>> client = Client(cluster)
>>> client
<Client: scheduler='tcp://10.41.0.34:41107' processes=0 cores=0>

有两件事需要检查,作业是否真的通过了队列?通常我们使用资源管理器操作来检查,例如 qstat, squeuebjobs。也许我们的作业被困在队列中了?

$ bash
JOBID   USER       STAT   SLOTS    QUEUE       START_TIME    FINISH_TIME   JOB_NAME
600785  mrocklin   RUN    43       batch       Aug 26 13:11  Aug 26 13:41  dask-worker
600786  mrocklin   RUN    43       batch       Aug 26 13:11  Aug 26 13:41  dask-worker
600784  mrocklin   RUN    43       batch       Aug 26 13:11  Aug 26 13:41  dask-worker

不对,看起来它们处于运行状态。现在我们去查看它们的日志。有时追踪您作业的日志文件可能很棘手,但您的 IT 管理员应该知道它们在哪里。通常它们就在您运行作业的目录,文件名中包含作业 ID。

$ cat dask-worker.600784.err
distributed.worker - INFO -       Start worker at: tcp://128.219.134.81:44053
distributed.worker - INFO -          Listening to: tcp://128.219.134.81:44053
distributed.worker - INFO -          dashboard at:       128.219.134.81:34583
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         16
distributed.worker - INFO -                Memory:                   75.00 GB
distributed.worker - INFO -       Local Directory: /autofs/nccs-svm1_home1/mrocklin/worker-ybnhk4ib
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
...

所以 worker 进程已经启动了,但它们连接到 scheduler 时遇到了困难。当我们咨询 IT 管理员时,他们指出这里的地址在错误的网络接口上。

128.219.134.74  <--- not accessible network address

于是我们运行 ifconfig,找到 Infiniband 网络接口 ib0,它更易于访问。

cluster = LSFCluster(
    cores=128,
    memory="500 GB",
    project="GEN119",
    walltime="00:30",
    job_extra=["-nnodes 1"],
    header_skip=["-R", "-n ", "-M"],
    interface="ib0",                    # <--- new!
)

我们尝试了一下,但仍然不行 :(

交互式节点

专家用户接着说:“哦,我们的登录节点非常严格,我们试试从一个交互式计算节点来运行。在那里通常会好很多。”我们运行了一些神秘的 bash 命令(我从未见过两个相似的,所以这里省略了),然后一切就奇迹般地开始工作了。太好了!

我们运行一个微小的 Dask 计算来证明我们可以完成一些工作。

>>> client = Client(cluster)
>>> client.submit(lambda x: x + 1, 10).result()
11

实际上,我们最终还是能够通过 LSF 中稍微不同的 bsub 命令,在 Summit 的登录节点上运行 Dask,但这里我将省略细节,因为我们正在 Dask 中修复这个问题,这不太可能影响未来的用户(我希望如此?)。严格的登录节点仍然是各种系统上连接失败的常见原因。在我接触过的系统中,大约有 30% 是这种情况。

SSH 隧道

让 dashboard 运行起来很重要,这样您才能看到发生了什么。通常我们通过 SSH 隧道来实现这一点。大多数 HPC 用户都知道如何做,并且在上面的 YouTube 截屏视频中也涵盖了,所以我这里将跳过它。

JupyterLab

如今许多 HPC 上的交互式 Dask 用户正在转向使用 JupyterLab。这个选择为他们提供了笔记本、终端、文件浏览器和 Dask 的 dashboard,所有这些都在一个单独的网页标签中。这大大减少了他们需要通过 SSH 连接的次数,并且借助 web 代理的魔力,意味着他们只需要建立一次隧道。

我通过 conda 安装了 JupyterLab 和一个代理库,然后尝试设置 Dask JupyterLab 扩展

conda install jupyterlab
pip install jupyter-server-proxy  # to route dashboard through Jupyter's port

接下来,我们将把 Dask Labextension 安装到 JupyterLab 中,以便将 Dask Dashboard 直接集成到我们的 Jupyter 会话中。为此,我们需要 nodejs 来将东西安装到 JupyterLab 中。考虑到 Power 架构,我以为这会很麻烦,但令人惊讶的是,这也似乎在 Anaconda 的默认 Power 通道中。

[email protected] $ conda install nodejs  # Thanks conda packaging devs!

然后我安装了 Dask-Labextension,它既是一个 Python 包也是一个 JavaScript 包。

pip install dask_labextension
jupyter labextension install dask-labextension

然后我为我的 Jupyter 会话设置了一个密码

jupyter notebook password

并以网络友好的方式运行 JupyterLab

[email protected] $ jupyter lab --no-browser --ip="login2"

并从我的本地机器到登录节点建立一个单一的 SSH 隧道

# Be sure to match the login node's hostname and the Jupyter port below

mrocklin@my-laptop $ ssh -L 8888:login2:8888 summit.olcf.ornl.gov

我现在可以从我的笔记本电脑连接到 Jupyter,只需导航到 http://localhost:8888,在笔记本中运行上述集群命令,一切都很顺利。此外,得益于 jupyter-server-proxy,Dask 的 dashboard 也可在 http://localhost:8888/proxy/####/status 访问,其中 #### 是当前托管 Dask dashboard 的端口。您可以通过查看 cluster.dashboard_link 来找到这个端口。它默认为 8787,但如果您最近在系统上启动了一堆 Dask scheduler,该端口可能被占用,因此 Dask 不得不使用随机端口。

配置文件

我不想一直输入所有这些命令,所以现在我把这些东西放进一个配置文件中,然后把这个文件放到 ~/.config/dask/summit.yaml 中(任何以 .yaml 结尾的文件名都可以)。

jobqueue:
  lsf:
    cores: 128
    processes: 8
    memory: 500 GB
    job-extra:
      - "-nnodes 1"
    interface: ib0
    header-skip:
      - "-R"
      - "-n "
      - "-M"

labextension:
  factory:
    module: "dask_jobqueue"
    class: "LSFCluster"
    args: []
    kwargs:
      project: your-project-id

Worker 启动慢

现在事情变得更容易使用了,我发现自己使用系统的次数更多了,也出现了一些其他问题。

我注意到启动一个 worker 需要很长时间。它似乎在启动过程中间歇性地挂起,所以我向 distributed/__init__.py 添加了几行代码,每秒打印出主 Python 线程的状态,看看这是在哪里发生的。

import threading, sys, time
from . import profile

main_thread = threading.get_ident()

def f():
    while True:
        time.sleep(1)
        frame = sys._current_frames()[main_thread]
        print("".join(profile.call_stack(frame)

thread = threading.Thread(target=f, daemon=True)
thraed.start()

这会打印出一个回溯信息,指向 Dask 中的这段代码。

if is_locking_enabled():
    try:
        self._lock_path = os.path.join(self.dir_path + DIR_LOCK_EXT)
        assert not os.path.exists(self._lock_path)
        logger.debug("Locking %r...", self._lock_path)
        # Avoid a race condition before locking the file
        # by taking the global lock
        try:
                with workspace._global_lock():
                    self._lock_file = locket.lock_file(self._lock_path)
                    self._lock_file.acquire()

看来 Dask 正在尝试使用基于文件的锁。不幸的是,一些 NFS 系统不喜欢基于文件的锁,或者处理速度很慢。在 Summit 的情况下,主目录实际上是从计算节点以只读方式挂载的,因此基于文件的锁会直接失败。查看 is_locking_enabled 函数,我们看到它检查了一个配置值。

def is_locking_enabled():
    return dask.config.get("distributed.worker.use-file-locking")

所以我们将其添加到我们的配置文件中。同时,我将多进程方法从 forkserver 切换到 spawn(我以为这可能有帮助,但并没有),这相对无害。

distributed:
  worker:
    multiprocessing-method: spawn
    use-file-locking: False

jobqueue:
  lsf:
    cores: 128
    processes: 8
    memory: 500 GB
    job-extra:
      - "-nnodes 1"
    interface: ib0
    header-skip:
    - "-R"
    - "-n "
    - "-M"

labextension:
  factory:
     module: 'dask_jobqueue'
     class: 'LSFCluster'
     args: []
     kwargs:
       project: your-project-id

结论

这篇文章概述了我在让 Dask 在特定 HPC 系统上运行时遇到的许多问题。这些问题并非普遍存在,所以您可能不会遇到它们,但它们也不是非常罕见。我写这篇文章的主要目的是让大家了解 Dask 与 HPC 系统交互时可能出现的各种问题。

上述问题都不算特别严重。它们以前都发生过,并且都有可以在配置文件中写下的解决方案。然而,找出问题所在可能具有挑战性,通常需要熟悉 Dask 和熟悉该特定 HPC 系统的人员的综合专业知识。

jobqueue.dask.org/en/latest/configurations.html 这里发布了一些配置文件,这可能会提供信息。 Dask Jobqueue 问题跟踪器 也是一个相当友好的地方,里面有很多 IT 专业人士和 Dask 专家。

另外,提醒一下,您不需要拥有 HPC 机器才能使用 Dask。Dask 可以方便地从其他云、Hadoop 和本地系统部署。更多信息请参阅 Dask 设置文档

未来工作:GPU

Summit 之所以速度快是因为它有很多 GPU。我接下来会研究这方面,但这可能足够写另一篇完整的博客文章了 :)

分支

对于在家(或在 Summit 上)跟着操作的任何人。我正在使用以下开发分支:

但希望在撰写本文一个月内,一切都能处于一个不错的发布状态。


博客评论由 Disqus 提供支持