快速消息序列化
这项工作得到了 Continuum Analytics 和 XDATA 项目 作为 Blaze 项目的一部分的支持。
极高的性能并非在于做好一件事,而在于没有任何一件事做得糟糕。
本周我优化了由 dask.distributed
使用的节点间通信协议。这是一项有趣的优化练习,涉及了几个不同且意想不到的组件。我分别处理了 Pickle、NumPy、Tornado、MsgPack 和压缩库。
这篇博文并非宣传特定功能,而是讲述了我为了在 Python 技术栈中快速传输极小和极大的数值数据而在设计和优化协议时遇到的问题。
我们非常重视处理大量小消息(每秒数千个 100 字节消息)和处理超大消息(100-1000 MB)这两种情况。这涵盖了一个有趣的性能范围。最终我们得到的协议在小消息情况下开销约为 5 微秒,在大消息情况下运行速度为 1-1.5 GB/s。
发现问题
这发生在我正在准备一个演示时,该演示使用 dask.array
在分布式集群上为 Continuum 网络研讨会进行。我注意到我的计算花费了比预期长得多的时间。Web UI 很快指出,我的机器花费了 10-20 秒来移动它们之间的 30 MB numpy 数组数据块。这非常奇怪,因为我使用的是 100MB/s 的网络,所以我预计这些传输花费的时间更接近 0.3 秒而不是 15 秒。
Web UI 使这一切变得非常明显,所以我学到的第一课是,当性能问题变得显而易见时,可视化性能分析工具是多么宝贵。在此感谢帮助开发 Dask 实时 Web UI 的 Bokeh 开发者。
问题 1:Tornado 的哨兵值
Dask 的网络通信构建在 Tornado 的 TCP IOStreams 之上。
在套接字上区分消息有两种常见方式:使用标记消息末尾的哨兵值,以及在每条消息前加上长度前缀。早期我们在 Dask 中尝试了这两种方法,但发现每条消息前加上长度前缀很慢。结果发现这是因为 TCP 套接字会尝试批量处理小消息以提高带宽。关闭这项优化最终成为一个有效且简单的解决方案,请参阅 TCP_NODELAY
参数。
然而,在我们弄清楚之前,我们使用哨兵值很长时间。不幸的是,Tornado 对大消息的哨兵值处理得不好。在接收到每条新消息时,它会遍历所有缓冲数据以查找哨兵值。这会产生大量的复制并读取大量的字节。如果您的消息只有几千字节,这不成问题(这在 Web 开发中很常见),但如果您的消息长达数百万或数十亿字节,那就太糟糕了。
改回使用长度前缀和关闭无延迟优化,将我们每节点的带宽从 3MB/s 提高到 20MB/s。感谢 Ben Darnell(Tornado 主要开发者)帮助我们找到这个问题。
问题 2:内存复制
一台不错的机器可以以 5 GB/s 的速度复制内存。如果你的网络只有 100 MB/s,那么你的系统中即使发生几次内存复制也无需担心。这会导致代码看起来像这样
socket.send(header + payload)
这段代码将两个字节串 header
和 payload
连接起来,然后通过套接字发送结果。如果我们非常在意避免内存复制,那么我们可能会将这两个分开发送
socket.send(header)
socket.send(payload)
但谁会在意呢,对吧?以 5 GB/s 的速度复制内存很便宜!
不幸的是,在以下任一条件下,这种情况就不成立了
- 你足够粗心,多次执行此操作
- 你发现自己在内存带宽非常低的机器上,比如慢 10 倍,就像 某些 EC2 机器上一样。
这两种情况我都遇到了,但幸运的是,通常很容易通过适度的努力将复制次数减少到很少(我们减少到了三次)。
问题 3:不必要的压缩
如果 LZ4 或 Snappy 可用,Dask 会使用它们压缩所有大消息。不幸的是,如果你的数据不太可压缩,那么这大部分时间都浪费了。双重不幸的是,你还需要在接收端解压数据。解压不太可压缩的数据出奇地慢。
现在我们采用以下策略进行压缩
- 如果消息小于 10kB,则不进行压缩
- 从中选取五个 10kB 的数据样本并进行压缩。如果结果压缩效果不好,则不压缩整个负载。
- 压缩整个负载,如果压缩效果不好,则直接发送原始数据,以免接收方进行解压。
在这种情况下,我们使用廉价的检查来避免不必要的压缩。对于我们非常重视的小消息,我们也完全避免了任何开销。
问题 4:Cloudpickle 不如 Pickle 快
这令人惊讶,因为 cloudpickle 对于简单的数据类型(如 NumPy 数组)大多委托给 Pickle 处理。
In [1]: import numpy as np
In [2]: data = np.random.randint(0, 255, dtype='u1', size=10000000)
In [3]: import pickle, cloudpickle
In [4]: %time len(pickle.dumps(data, protocol=-1))
CPU times: user 8.65 ms, sys: 8.42 ms, total: 17.1 ms
Wall time: 16.9 ms
Out[4]: 10000161
In [5]: %time len(cloudpickle.dumps(data, protocol=-1))
CPU times: user 20.6 ms, sys: 24.5 ms, total: 45.1 ms
Wall time: 44.4 ms
Out[5]: 10000161
但事实证明,cloudpickle 使用的是 Python 实现,而 pickle 本身(或 Python 2 中的 cPickle
)使用的是编译过的 C 实现。幸运的是,这很容易纠正,对 Python 中常见的大数据格式(NumPy 和 Pandas)进行快速类型检查就能获得速度提升。
问题 5:Pickle 仍然比预期的慢
Pickle 的运行速度大约是内存复制的一半,这与你对一个主要只是“序列化数据类型、步长,然后附加数据字节”的协议的预期相符。其中肯定存在一个多余的内存复制。
参见 问题 7544
问题 6:MsgPack 不擅长处理大型字节串
Dask 使用 MsgPack 序列化大多数消息,这通常非常快。不幸的是,MsgPack 规范不支持大于 4GB 的字节串(我们确实会遇到这种情况),并且 Python 实现不能非常高效地传递大型字节串。因此,我们不得不单独处理大型字节串。任何包含大小超过 1MB 字节串的消息,都会将其剥离并在单独的帧中发送。这既避免了 MsgPack 的开销,又避免了内存复制(我们可以直接将字节发送到套接字)。
问题 7:Tornado 会进行复制
Windows 上的套接字不接受大于 128kB 的负载。因此,Tornado 会将大消息分割成许多小消息。在 Linux 上,这种内存复制是多余的。可以在 Tornado 中加入一些逻辑来去除它。我可能会在不久的将来做这件事。
结果
我们大约在 5 微秒内序列化小消息(感谢 msgpack!),并通过三次内存复制(约 1-1.5 GB/s)来移动大字节数据,这通常比大多数正在使用的网络更快。
这是在我的机器上通过 localhost 发送和接收一个千兆字节大小的随机值 NumPy 数组到同一进程的性能分析(我的机器上速度为 500 MB/s)。
381360 function calls (381323 primitive calls) in 1.451 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.366 0.366 0.366 0.366 {built-in method dumps}
8 0.289 0.036 0.291 0.036 iostream.py:360(write)
15353 0.228 0.000 0.228 0.000 {method 'join' of 'bytes' objects}
15355 0.166 0.000 0.166 0.000 {method 'recv' of '_socket.socket' objects}
15362 0.156 0.000 0.398 0.000 iostream.py:1510(_merge_prefix)
7759 0.101 0.000 0.101 0.000 {method 'send' of '_socket.socket' objects}
17/14 0.026 0.002 0.686 0.049 gen.py:990(run)
15355 0.021 0.000 0.198 0.000 iostream.py:721(_read_to_buffer)
8 0.018 0.002 0.203 0.025 iostream.py:876(_consume)
91 0.017 0.000 0.335 0.004 iostream.py:827(_handle_write)
89 0.015 0.000 0.217 0.002 iostream.py:585(_read_to_buffer_loop)
122567 0.009 0.000 0.009 0.000 {built-in method len}
15355 0.008 0.000 0.173 0.000 iostream.py:1010(read_from_fd)
38369 0.004 0.000 0.004 0.000 {method 'append' of 'list' objects}
7759 0.004 0.000 0.104 0.000 iostream.py:1023(write_to_fd)
1 0.003 0.003 1.451 1.451 ioloop.py:746(start)
主要的额外开销包括:
- 400毫秒:Pickle 序列化 NumPy 数组
- 400毫秒:Tornado 内的字节串处理
在此之后,我们主要受限于通过网络传输字节的速度。
结论
编写快速代码并非在于特别精通某一方面,而在于消除一切可能阻碍你的因素。当你接近峰值性能时,先前微小的缺陷会突然变成主要的瓶颈。这里的成功取决于频繁的性能分析,并对意想不到和令人惊讶的开销保持开放的心态。
链接
- EC2 内存复制缓慢的 StackOverflow 问题。
- Tornado 发送大消息的问题。
- 关于 TCP 协议中 Nagle 算法处理小数据包的维基百科页面。
- NumPy 双重内存复制问题。
- Cloudpickle 对 memoryview 支持的问题。
- dask.distributed
博客评论由 Disqus 提供