在云端扩展规模时,过度使用计算资源可能会变得昂贵。

举一个真实的例子,我当时正在为一个空间基因测序设备开发图像处理管线,该设备不仅能报告哪些基因正在表达,还能报告它们在细胞 3D 体积中的位置。为了获取这些信息,一台专用显微镜会拍摄细胞培养物或组织的快照,然后将得到的数据通过 Dask 管线进行处理。

该管线相当慢,所以我粗略估算了一下,一旦我们开始为客户处理更多数据,我们的计算成本会是多少。结果发现,我们 70% 的收入都将用于支付云计算费用!

显然我需要优化这段代码。

当我们考虑大规模计算中的瓶颈时,通常会关注 CPU:我们希望使用更多的 CPU 核心以获得更快的计算结果。支付所有这些 CPU 费用可能很昂贵,就像本例一样,我也确实成功地大幅降低了 CPU 使用率。

但高内存使用率也是一个问题,解决这个问题促使我开发了一系列工具,这些工具也能帮助你优化和降低 Dask 的内存使用率。

在本文的其余部分,你将学到

问题:固定的处理块和高内存/CPU 比率

提醒一下,我当时正在开发一个处理来自专用显微镜数据的 Dask 管线。生成的数据量相当大,并且必须将某些图像子集作为一个单元一起处理。从计算角度来看,我们实际上有一系列输入 X0, X1, X2, ...,它们可以由一个函数 f() 独立处理。

f() 的内部处理不容易进一步并行化。从 CPU 调度角度来看,这没问题,考虑到大量的 X 输入,这仍然是一个易于并行化的问题 (embarrassingly parallel problem)。

例如,如果我配置了一台具有 4 个 CPU 核心的虚拟机,为了处理这些数据,我可以启动四个进程,每个进程都会充分利用一个核心。如果我有 12 个输入,并且每个处理步骤所需时间大致相同,它们可能会按如下方式运行:

  • CPU0: f(X0), f(X4), f(X8)
  • CPU1: f(X1), f(X5), f(X9)
  • CPU2: f(X2), f(X6), f(X10)
  • CPU3: f(X3), f(X7), f(X11)

如果我能让 f() 运行得更快,那么整个管线也会运行得更快。

然而,CPU 并不是计算中唯一使用的资源:RAM 也可能成为瓶颈。例如,假设每次调用 f(Xi) 需要 12GB RAM。这意味着要完全利用 4 个 CPU,我需要 48GB RAM——但如果我的计算机只有 16GB RAM 怎么办?

即使我的计算机有 4 个 CPU,在一台只有 16GB RAM 的计算机上我也只能利用一个 CPU,因为我没有足够的 RAM 来并行运行多个任务。 实际上,这些任务是在云端运行的,在那里我可以选择合适的预配置虚拟机实例,确保必要的 RAM/核心比率得以保持。在某些云平台上,你可以自由设置你启动的每个虚拟机的 RAM 容量和 CPU 核心数量。

然而,我不太清楚峰值内存使用量是多少,所以我不得不限制并行度以减少内存不足错误。结果,我们使用的默认虚拟机有一半的 CPU 处于空闲状态,我们为这些资源付了费却没有使用。

为了恰当地配置硬件并充分利用所有 CPU,我需要知道每个任务的峰值内存使用量是多少。为此,我创建了一个新工具。

使用 dask-memusage 测量任务峰值内存使用情况

dask-memusage 是一个用于测量 Dask 执行图中每个任务峰值内存使用情况的工具。

  • 按任务划分,因为 Dask 将代码作为任务图来执行,并且该图决定了可以使用的并行度。
  • 峰值内存很重要,因为它是瓶颈所在。每个任务的平均内存使用量是 4GB 并不重要,如果图中的两个并行任务同时都需要 12GB,那么如果你想在同一台计算机上运行这两个任务,你需要 24GB RAM。

使用 dask-memusage

由于基因测序代码是专有的且相当复杂,我们来使用另一个例子。我们将统计一些文本文件中单词的出现次数,然后报告每个文件中最常见的 10 个单词。你可以想象稍后将数据组合起来,但在本简单示例中我们不会费心做这件事。

import sys
import gc
from time import sleep
from pathlib import Path
from dask.bag import from_sequence
from collections import Counter
from dask.distributed import Client, LocalCluster
import dask_memusage


def calculate_top_10(file_path: Path):
    gc.collect()  # See notes below

    # Load the file
    with open(file_path) as f:
        data = f.read()

    # Count the words
    counts = Counter()
    for word in data.split():
        counts[word.strip(".,'\"").lower()] += 1

    # Choose the top 10:
    by_count = sorted(counts.items(), key=lambda x: x[1])
    sleep(0.1)  # See notes below
    return (file_path.name, by_count[-10:])


def main(directory):
    # Setup the calculation:

    # Create a 4-process cluster (running locally). Note only one thread
    # per-worker: because polling is per-process, you can't run multiple
    # threads per worker, otherwise you'll get results that combine memory
    # usage of multiple tasks.
    cluster = LocalCluster(n_workers=4, threads_per_worker=1,
                           memory_limit=None)
    # Install dask-memusage:
    dask_memusage.install(cluster.scheduler, "memusage.csv")
    client = Client(cluster)

    # Create the task graph:
    files = from_sequence(Path(directory).iterdir())
    graph = files.map(calculate_top_10)
    graph.visualize(filename="example2.png", rankdir="TD")

    # Run the calculations:
    for result in graph.compute():
        print(result)
    # ... do something with results ...


if __name__ == '__main__':
    main(sys.argv[1])

任务图如下所示

并行度很高!

我们可以在一些文件上运行该程序

$ pip install dask[bag] dask_memusage
$ python example2.py files/
('frankenstein.txt', [('that', 1016), ('was', 1021), ('in', 1180), ('a', 1438), ('my', 1751), ('to', 2164), ('i', 2754), ('of', 2761), ('and', 3025), ('the', 4339)])
('pride_and_prejudice.txt', [('she', 1660), ('i', 1730), ('was', 1832), ('in', 1904), ('a', 1981), ('her', 2142), ('and', 3503), ('of', 3705), ('to', 4188), ('the', 4492)])
('greatgatsby.txt', [('that', 564), ('was', 760), ('he', 770), ('in', 849), ('i', 999), ('to', 1197), ('of', 1224), ('a', 1440), ('and', 1565), ('the', 2543)])
('big.txt', [('his', 40032), ('was', 45356), ('that', 47924), ('he', 48276), ('a', 83228), ('in', 86832), ('to', 114184), ('and', 152284), ('of', 159888), ('the', 314908)])

正如所料,最常见的单词是词干,但顺序仍然存在一些变化。

接下来,我们看看 dask-memusage 的结果。

dask-memusage 输出及其工作原理

你会注意到,dask-memusage 的实际使用除了 import 之外,只需要额外一行代码

dask_memusage.install(cluster.scheduler, "memusage.csv")

这将以 10ms 的间隔轮询进程,获取按任务划分的峰值内存使用情况。在本例中,memusage.csv 文件内容如下所示

task_key,min_memory_mb,max_memory_mb
"('from_sequence-3637e6ff937ef8488894df60a80f62ed', 3)",51.2421875,51.2421875
"('from_sequence-3637e6ff937ef8488894df60a80f62ed', 0)",51.70703125,51.70703125
"('from_sequence-3637e6ff937ef8488894df60a80f62ed', 1)",51.28125,51.78515625
"('from_sequence-3637e6ff937ef8488894df60a80f62ed', 2)",51.30859375,51.30859375
"('calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca', 2)",56.19140625,56.19140625
"('calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca', 0)",51.70703125,54.26953125
"('calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca', 1)",52.30078125,52.30078125
"('calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca', 3)",51.48046875,384.00390625

对于图中的每个任务,我们被告知最小内存使用量和峰值内存使用量,单位是 MB。

更易读的形式

task_key min_memory_mb max_memory_mb
”(‘from_sequence-3637e6ff937ef8488894df60a80f62ed’, 3)” 51.2421875 51.2421875
”(‘from_sequence-3637e6ff937ef8488894df60a80f62ed’, 0)” 51.70703125 51.70703125
”(‘from_sequence-3637e6ff937ef8488894df60a80f62ed’, 1)” 51.28125 51.78515625
”(‘from_sequence-3637e6ff937ef8488894df60a80f62ed’, 2)” 51.30859375 51.30859375
”(‘calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca’, 2)” 56.19140625 56.19140625
”(‘calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca’, 0)” 51.70703125 54.26953125
”(‘calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca’, 1)” 52.30078125 52.30078125
”(‘calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca’, 3)” 51.48046875 384.00390625

最后四行是值得关注的;这四行都以约 50MB RAM 的最小内存使用量开始,然后随着代码运行,内存可能会增加,也可能不会。增加多少可能取决于文件的大小;它们中的大多数都很小,所以内存使用量变化不大。其中一个文件使用的最大内存比其他文件多得多,为 384MB RAM;据推测它是 25MB 的 big.txt 文件,因为其他文件都小于 1MB。

使用的机制(轮询进程峰值内存)有一些局限性

  • 你会注意到在 calculate_top_10() 函数的顶部有一个 gc.collect();这确保我们不会计算尚未清理的先前代码遗留的内存。
  • calculate_top_10() 函数的底部还有一个 sleep()。由于使用了轮询,运行过快的任务将无法获得准确信息——轮询大约每 10ms 发生一次,所以你希望至少暂停 20ms。
  • 最后,由于轮询是基于进程的,你不能在每个 worker 中运行多个线程,否则你将得到结合了多个任务内存使用量的数据。

解释数据

我们学到的是 calculate_top_10() 函数的内存使用量随文件大小增长;这可以用来描述工作负载的内存需求。也就是说,我们可以创建一个模型来关联数据输入大小和所需的 RAM,然后我们可以计算在任何给定并行度级别下所需的 RAM。如果我们假设每个 CPU 核心运行一个任务,这就可以指导我们选择硬件。

回到我最初的驱动问题,即基因测序管线:利用来自 dask-memusage 的数据,我得以得出一个公式,表示“对于这种大小的输入,需要这么多内存”。因此,每当我们运行批量作业时,就可以根据机器上的 CPU 和 RAM 数量,尽可能高地设置并行度。

虽然这增加了并行度,但仍然不够——处理过程仍然使用了大量的 RAM,而这些 RAM 我们不得不通过时间(使用较少的 CPU)或金钱(购买更昂贵的具有更多 RAM 的虚拟机)来支付。所以下一步是减少内存使用。

使用 Fil 减少内存使用

如果我们查看词频统计示例的 dask-memusage 输出,内存使用量似乎相当高:对于一个 25MB 的文件,我们用了 330MB 的 RAM 来统计词频。仔细思考这段代码的理想版本应该如何工作,我们应该可以用少得多的内存来处理该文件(例如,我们可以重新设计代码,逐行处理文件,从而减少内存)。

这就是 dask-memusage 另一种有用的方式:它可以以任务的粒度,指向需要优化内存使用的特定代码。 不过,一个任务可能是一大块代码,所以下一步是使用一个可以指向特定代码行的内存分析器。

在开发基因测序工具时,我使用了 memory_profiler 包,虽然它有效,并且我成功地减少了很多内存使用,但我发现它相当难用。事实证明,对于批量数据处理(这是 Dask 的典型用例),你需要一种不同类型的内存分析器

因此,在我离开那份工作后,我创建了一个名为 Fil 的内存分析器,它专门设计用于查找峰值内存使用。与可以在生产工作负载上运行的 dask-memusage 不同,Fil 会减慢你的执行速度,并且还有一些我目前正在努力解决的其他限制(截至 2021 年 3 月,它不支持多进程),所以目前更适合用于手动分析。

我们可以编写一个只在 big.txt 文件上运行的小脚本

from pathlib import Path
from example2 import calculate_top_10

calculate_top_10(Path("files/big.txt"))

在 Fil 下运行它

pip install filprofiler
fil-profile run example3.py

结果向我们展示了大部分内存是在哪里分配的

读取文件占用了 8% 的内存,但 data.split() 占用了 84% 的内存。也许我们不应该将整个文件加载到内存中并将其拆分成单词,而应该逐行处理文件。如果这是真实代码,下一步很好的措施就是修改 calculate_top_10() 的实现方式。

后续步骤

如果你的 Dask 工作负载使用了太多内存,你应该怎么做?

如果你使用 Distributed 后端运行 Dask 工作负载,并且可以接受每个 worker 只有一个线程,那么使用 dask-memusage 运行将为你提供生产工作负载上真实的每个任务的内存使用情况。然后你可以以多种方式使用这些结果信息

在我最初的使用案例——基因测序管线中,我通过结合较低的内存使用和较低的 CPU 使用,成功地将成本降至更为适度的水平。在进行研发时,我也能够在相同的硬件成本下获得更快的计算结果。

你可以在这里了解更多关于 dask-memusage 的信息,以及在这里了解更多关于 Fil 内存分析器 的信息


博客评论由 Disqus 提供支持