执行摘要

我们展示了一项实验,探讨如何将数据从像 Dask 这样松耦合的并行计算系统传递给像 MPI 这样紧耦合的并行计算系统。

我们提供了动机和一个完整易懂的示例。

以下是代码和结果的要点.

动机

免责声明:本文内容尚未完善,也不适用于生产环境。这是一项旨在引发讨论的实验。不提供长期支持。

我们经常收到以下问题

我如何使用 Dask 预处理数据,然后将结果传递给传统的 MPI 应用程序?

你可能需要这样做,因为你正在支持用 MPI 编写的遗留代码,或者因为你的计算需要只有 MPI 才能提供的紧耦合并行性。

第一个解决方案:写入磁盘

最简单的做法当然是将你的 Dask 结果写入磁盘,然后用 MPI 从磁盘重新加载。考虑到你的计算相对于数据加载的成本,这可能是一个不错的选择。

在本文的其余部分,我们将假设这不是一个好的选择。

第二个解决方案

我们有一个用 MPI4Py 编写的简单的 MPI 库,其中每个 rank 只打印它收到的所有数据。尽管原则上它可以调用 C++ 代码,并执行任意的 MPI 操作。

# my_mpi_lib.py
from mpi4py import MPI

comm = MPI.COMM_WORLD

def print_data_and_rank(chunks: list):
    """ Fake function that mocks out how an MPI function should operate

    -   It takes in a list of chunks of data that are present on this machine
    -   It does whatever it wants to with this data and MPI
        Here for simplicity we just print the data and print the rank
    -   Maybe it returns something
    """
    rank = comm.Get_rank()

    for chunk in chunks:
        print("on rank:", rank)
        print(chunk)

    return sum(chunk.sum() for chunk in chunks)

在我们的 Dask 程序中,我们将正常使用 Dask 来加载数据、进行一些预处理,然后将所有这些数据交给每个 MPI rank,后者将调用上面提到的 print_data_and_rank 函数来初始化 MPI 计算。

# my_dask_script.py

# Set up Dask workers from within an MPI job using the dask_mpi project
# See https://mpi.dask.org.cn/en/latest/

from dask_mpi import initialize
initialize()

from dask.distributed import Client, wait, futures_of
client = Client()

# Use Dask Array to "load" data (actually just create random data here)

import dask.array as da
x = da.random.random(100000000, chunks=(1000000,))
x = x.persist()
wait(x)

# Find out where data is on each worker
# TODO: This could be improved on the Dask side to reduce boiler plate

from toolz import first
from collections import defaultdict
key_to_part_dict = {str(part.key): part for part in futures_of(x)}
who_has = client.who_has(x)
worker_map = defaultdict(list)
for key, workers in who_has.items():
    worker_map[first(workers)].append(key_to_part_dict[key])


# Call an MPI-enabled function on the list of data present on each worker

from my_mpi_lib import print_data_and_rank

futures = [client.submit(print_data_and_rank, list_of_parts, workers=worker)
           for worker, list_of_parts in worker_map.items()]

wait(futures)

client.close()

然后,我们可以使用常规的 mpirunmpiexec 命令来调用这种 Dask 和 MPI 程序混合的模式。

mpirun -np 5 python my_dask_script.py

刚才发生了什么

所以 MPI 启动并运行了我们的脚本。dask-mpi 项目在 rank 0 上设置了一个 Dask 调度器,在 rank 1 上运行我们的客户端代码,然后在 ranks 2+ 上运行了一堆 worker。

  • Rank 0:运行 Dask 调度器
  • Rank 1:运行我们的脚本
  • Ranks 2+:运行 Dask worker

然后我们的脚本创建了一个 Dask 数组,尽管在这里它很可能从某个来源读取数据,并在继续之前进行更复杂的 Dask 操作。

然后我们等待所有 Dask 工作完成并处于空闲状态。接着我们查询调度器中的状态,以找出所有数据存储在哪里。这就是这里的代码

# Find out where data is on each worker
# TODO: This could be improved on the Dask side to reduce boiler plate

from toolz import first
from collections import defaultdict
key_to_part_dict = {str(part.key): part for part in futures_of(x)}
who_has = client.who_has(x)
worker_map = defaultdict(list)
for key, workers in who_has.items():
    worker_map[first(workers)].append(key_to_part_dict[key])

诚然,这段代码很糟糕,对非 Dask 专家(甚至 Dask 专家自己)来说也不是特别友好或明显,我是从执行相同技巧的 Dask XGBoost 项目 中偷来的。

但在此之后,我们只是使用 Dask 的 Futures 接口 对所有数据调用我们的 MPI 库的初始化函数 print_data_and_rank。该函数直接从本地内存获取数据(Dask worker 和 MPI rank 在同一个进程中),并执行 MPI 应用程序所需的任何操作。

未来工作

这可以通过几种方式改进

  1. 上面提到的“糟糕”的代码或许可以放入某个库代码中,以便人们更容易使用这种模式。

  2. 理想情况下,计算的 Dask 部分不必也由 MPI 管理,而是可以自行启动 MPI。

    你可以想象 Dask 运行在像 Kubernetes 这样的平台上,处理高度动态的工作,按需伸缩。然后它会达到需要运行一些 MPI 代码的时候,于是它会自行在其 worker 进程上启动 MPI,并在其数据上运行 MPI 应用程序。

  3. 我们在这里没有真正提及关于弹性/容错的内容。我猜测这并不难做到(Dask 有很多机制来构建复杂的任务间关系),但我没有在上面解决这个问题。

以下是代码和结果的要点.


博客评论由 Disqus 提供支持