同时运行 Dask 和 MPI 程序 一项实验
作者:Matthew Rocklin
执行摘要
我们展示了一项实验,探讨如何将数据从像 Dask 这样松耦合的并行计算系统传递给像 MPI 这样紧耦合的并行计算系统。
我们提供了动机和一个完整易懂的示例。
动机
免责声明:本文内容尚未完善,也不适用于生产环境。这是一项旨在引发讨论的实验。不提供长期支持。
我们经常收到以下问题
我如何使用 Dask 预处理数据,然后将结果传递给传统的 MPI 应用程序?
你可能需要这样做,因为你正在支持用 MPI 编写的遗留代码,或者因为你的计算需要只有 MPI 才能提供的紧耦合并行性。
第一个解决方案:写入磁盘
最简单的做法当然是将你的 Dask 结果写入磁盘,然后用 MPI 从磁盘重新加载。考虑到你的计算相对于数据加载的成本,这可能是一个不错的选择。
在本文的其余部分,我们将假设这不是一个好的选择。
第二个解决方案
我们有一个用 MPI4Py 编写的简单的 MPI 库,其中每个 rank 只打印它收到的所有数据。尽管原则上它可以调用 C++ 代码,并执行任意的 MPI 操作。
# my_mpi_lib.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
def print_data_and_rank(chunks: list):
""" Fake function that mocks out how an MPI function should operate
- It takes in a list of chunks of data that are present on this machine
- It does whatever it wants to with this data and MPI
Here for simplicity we just print the data and print the rank
- Maybe it returns something
"""
rank = comm.Get_rank()
for chunk in chunks:
print("on rank:", rank)
print(chunk)
return sum(chunk.sum() for chunk in chunks)
在我们的 Dask 程序中,我们将正常使用 Dask 来加载数据、进行一些预处理,然后将所有这些数据交给每个 MPI rank,后者将调用上面提到的 print_data_and_rank
函数来初始化 MPI 计算。
# my_dask_script.py
# Set up Dask workers from within an MPI job using the dask_mpi project
# See https://mpi.dask.org.cn/en/latest/
from dask_mpi import initialize
initialize()
from dask.distributed import Client, wait, futures_of
client = Client()
# Use Dask Array to "load" data (actually just create random data here)
import dask.array as da
x = da.random.random(100000000, chunks=(1000000,))
x = x.persist()
wait(x)
# Find out where data is on each worker
# TODO: This could be improved on the Dask side to reduce boiler plate
from toolz import first
from collections import defaultdict
key_to_part_dict = {str(part.key): part for part in futures_of(x)}
who_has = client.who_has(x)
worker_map = defaultdict(list)
for key, workers in who_has.items():
worker_map[first(workers)].append(key_to_part_dict[key])
# Call an MPI-enabled function on the list of data present on each worker
from my_mpi_lib import print_data_and_rank
futures = [client.submit(print_data_and_rank, list_of_parts, workers=worker)
for worker, list_of_parts in worker_map.items()]
wait(futures)
client.close()
然后,我们可以使用常规的 mpirun
或 mpiexec
命令来调用这种 Dask 和 MPI 程序混合的模式。
mpirun -np 5 python my_dask_script.py
刚才发生了什么
所以 MPI 启动并运行了我们的脚本。dask-mpi 项目在 rank 0 上设置了一个 Dask 调度器,在 rank 1 上运行我们的客户端代码,然后在 ranks 2+ 上运行了一堆 worker。
- Rank 0:运行 Dask 调度器
- Rank 1:运行我们的脚本
- Ranks 2+:运行 Dask worker
然后我们的脚本创建了一个 Dask 数组,尽管在这里它很可能从某个来源读取数据,并在继续之前进行更复杂的 Dask 操作。
然后我们等待所有 Dask 工作完成并处于空闲状态。接着我们查询调度器中的状态,以找出所有数据存储在哪里。这就是这里的代码
# Find out where data is on each worker
# TODO: This could be improved on the Dask side to reduce boiler plate
from toolz import first
from collections import defaultdict
key_to_part_dict = {str(part.key): part for part in futures_of(x)}
who_has = client.who_has(x)
worker_map = defaultdict(list)
for key, workers in who_has.items():
worker_map[first(workers)].append(key_to_part_dict[key])
诚然,这段代码很糟糕,对非 Dask 专家(甚至 Dask 专家自己)来说也不是特别友好或明显,我是从执行相同技巧的 Dask XGBoost 项目 中偷来的。
但在此之后,我们只是使用 Dask 的 Futures 接口 对所有数据调用我们的 MPI 库的初始化函数 print_data_and_rank
。该函数直接从本地内存获取数据(Dask worker 和 MPI rank 在同一个进程中),并执行 MPI 应用程序所需的任何操作。
未来工作
这可以通过几种方式改进
-
上面提到的“糟糕”的代码或许可以放入某个库代码中,以便人们更容易使用这种模式。
-
理想情况下,计算的 Dask 部分不必也由 MPI 管理,而是可以自行启动 MPI。
你可以想象 Dask 运行在像 Kubernetes 这样的平台上,处理高度动态的工作,按需伸缩。然后它会达到需要运行一些 MPI 代码的时候,于是它会自行在其 worker 进程上启动 MPI,并在其数据上运行 MPI 应用程序。
-
我们在这里没有真正提及关于弹性/容错的内容。我猜测这并不难做到(Dask 有很多机制来构建复杂的任务间关系),但我没有在上面解决这个问题。
博客评论由 Disqus 提供支持