面向核外ND数组 -- Dask + Toolz = Bag
这项工作由 Continuum Analytics 和 XDATA Program 作为 Blaze Project 的一部分提供支持。
太长不看 我们使用 dask 来构建一个并行 Python 列表。
引言
这是使用 NumPy 和 dask 构建核外 nd-array 系列文章中的第七篇。您可以在此处查看这些文章
今天我们暂时不讨论 ND 数组,而是展示任务调度如何处理其他集合,例如简单的 Python 对象 list
。
非结构化数据
通常在我们拥有 ndarray
或 table/DataFrame
之前,我们会有像日志文件这样的非结构化数据。在这种情况下,经过调整的语言子集(例如 numpy
/pandas
)是不够的,我们需要完整的 Python 语言。
我处理那些不方便处理的大量日志文件的常用方法是使用 Python 流式迭代器,并在一台大型机器上结合使用 多进程或 IPython Parallel。在讨论 toolz
时,我经常撰写文章或发表演讲来介绍这个工作流程。
当引入许多进程时,这种工作流程对于大多数用户来说会变得复杂。为了解决这个问题,我们将我们常用的技巧构建到一个新的 dask.Bag
集合中。
Bag
正如 dask.array
模仿 NumPy 操作(例如矩阵乘法、切片)一样,dask.bag
也模仿标准库中找到的函数式操作,如 map
、filter
、reduce
,以及 toolz
中找到的许多流式函数。
- Dask array = NumPy + 线程
- Dask bag = Python/Toolz + 进程
示例
这是必不可少的词频统计示例
>>> from dask.bag import Bag
>>> b = Bag.from_filenames('data/*.txt')
>>> def stem(word):
... """ Stem word to primitive form """
... return word.lower().rstrip(",.!:;'-\"").lstrip("'\"")
>>> dict(b.map(str.split).map(concat).map(stem).frequencies())
{...}
我们使用所有的核心,并在每个核心上通过内存进行流式处理。我们使用 multiprocessing
,但经过一些工作可以做得更高级。
相关工作
有许多更大、更强大的系统也具有类似的 API,特别是 Spark 和 DPark。如果您有大数据并且需要使用非常多的机器,那么您应该停止阅读本文并去安装它们。
我主要创建 dask.bag 是因为
- 鉴于 dask.array 上已经完成的工作,创建它非常容易
- 我通常只需要多进程 + 一台高性能机器
- 我想要一个易于通过 pip 安装且不使用 JVM 的东西
但再说一遍,如果您有大数据,那么这不适合您。
设计
和之前一样,一个 Bag
只是一个持有任务的字典,外加一些元数据。
>>> d = {('x', 0): (range, 5),
... ('x', 1): (range, 5),
... ('x', 2): (range, 5)}
>>> from dask.bag import Bag
>>> b = Bag(d, 'x', npartitions=3)
通过这种方式,我们将一个集合分解为
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
三个独立的部分
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
当我们抽象地操作大型集合时...
>>> b2 = b.map(lambda x: x * 10)
... 我们会生成新的任务来操作每个组件。
>>> b2.dask
{('x', 0): (range, 5),
('x', 1): (range, 5),
('x', 2): (range, 5)}
('bag-1', 0): (map, lambda x: x * 10, ('x', 0)),
('bag-1', 1): (map, lambda x: x * 10, ('x', 1)),
('bag-1', 2): (map, lambda x: x * 10, ('x', 2))}
当我们只需要具体结果(调用 list
)时,我们会启动一个调度器来执行生成的任务依赖图。
>>> list(b2)
[0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 0, 10, 20, 30, 40]
更复杂的操作会产生更复杂的 dasks。请注意,dask 代码相当“Lisp 式”。幸运的是,这些 dasks 是内部的;用户无需与之交互。
>>> iseven = lambda x: x % 2 == 0
>>> b3 = b.filter(iseven).count().dask
{'bag-3': (sum, [('bag-2', 1), ('bag-2', 2), ('bag-2', 0)]),
('bag-2', 0): (count,
(filter, iseven, (range, 5))),
('bag-2', 1): (count,
(filter, iseven, (range, 5))),
('bag-2', 2): (count,
(filter, iseven, (range, 5)))}
Bag
当前的接口提供以下操作
all frequencies min
any join product
count map std
filter map_partitions sum
fold max topk
foldby mean var
对 bag 的操作会创建任务依赖图。我们最终会并行执行这些图。
执行
我们重新利用用于数组的线程调度器来支持 multiprocessing
,以便即使在纯 Python 代码上也能提供并行性。我们注意避免不必要的数据传输。上面列出的操作都不需要大量的通信。值得注意的是,我们没有任何混洗或散列/聚集的概念。
我们使用 dill 来确保正确序列化函数并收集/报告错误,这两个问题困扰着 Python 中对 multiprocessing
的简单使用。
>>> list(b.map(lambda x: x * 10)) # This works!
[0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 0, 10, 20, 30, 40]
>>> list(b.map(lambda x: x / 0)) # This errs gracefully!
ZeroDivisionError: Execption in remote Process
integer division or modulo by zero
Traceback:
...
这些技巧消除了对用户专业知识的需求。
高效的最佳点
我认为以下配置中存在一个高效的最佳点
- 纯 Python 函数
- 流式/延迟计算数据
- 多进程
- 一台大型机器或非正式集群中的几台机器
这种设置很常见,能够处理TB级的工作流程。根据我的经验,人们很少采用这种方式。他们会使用单线程内存中的 Python,直到它崩溃,然后寻求 Hadoop/Spark 等大数据基础设施,但这样做会带来相对较高的生产力开销。
您的工作站的扩展能力可能超出您的想象。
示例
这里有大约1GB的网络流数据,记录了1996年UC伯克利校区内哪些计算机连接了哪些其他计算机。
846890339:661920 846890339:755475 846890340:197141 168.237.7.10:1163 83.153.38.208:80 2 8 4294967295 4294967295 846615753 176 2462 39 GET 21068906053917068819..html HTTP/1.0
846890340:989181 846890341:2147 846890341:2268 13.35.251.117:1269 207.83.232.163:80 10 0 842099997 4294967295 4294967295 64 1 38 GET 20271810743860818265..gif HTTP/1.0
846890341:80714 846890341:90331 846890341:90451 13.35.251.117:1270 207.83.232.163:80 10 0 842099995 4294967295 4294967295 64 1 38 GET 38127854093537985420..gif HTTP/1.0
这实际上相对干净。许多字段是空格分隔的(不是全部),而且我已经编译并运行了用于从原始格式解压数据的十年旧 C 代码。
让我们使用 Bag 和正则表达式来解析它。
In [1]: from dask.bag import Bag, into
In [2]: b = Bag.from_filenames('UCB-home-IP*.log')
In [3]: import re
In [4]: pattern = """
...: (?P<request_time>\d+:\d+)
...: (?P<response_start>\d+:\d+)
...: (?P<response_end>\d+:\d+)
...: (?P<client_ip>\d+\.\d+\.\d+\.\d+):(?P<client_port>\d+)
...: (?P<server_ip>\d+\.\d+\.\d+\.\d+):(?P<server_port>\d+)
...: (?P<client_headers>\d+)
...: (?P<server_headers>\d+)
...: (?P<if_modified_since>\d+)
...: (?P<response_header_length>\d+)
...: (?P<response_data_length>\d+)
...: (?P<request_url_length>\d+)
...: (?P<expires>\d+)
...: (?P<last_modified>\d+)
...: (?P<method>\w+)
...: (?P<domain>\d+..)\.(?P<extension>\w*)(?P<rest_of_url>\S*)
...: (?P<protocol>.*)""".strip().replace('\n', '\s+')
In [5]: prog = re.compile(pattern)
In [6]: records = b.map(prog.match).map(lambda m: m.groupdict())
这会立即返回。我们只在必要时计算结果。我们通过调用 list
触发计算。
In [7]: list(records.take(1))
Out[7]:
[{'client_headers': '2',
'client_ip': '168.237.7.10',
'client_port': '1163',
'domain': '21068906053917068819.',
'expires': '2462',
'extension': 'html',
'if_modified_since': '4294967295',
'last_modified': '39',
'method': 'GET',
'protocol': 'HTTP/1.0',
'request_time': '846890339:661920',
'request_url_length': '176',
'response_data_length': '846615753',
'response_end': '846890340:197141',
'response_header_length': '4294967295',
'response_start': '846890339:755475',
'rest_of_url': '',
'server_headers': '8',
'server_ip': '83.153.38.208',
'server_port': '80'}]
因为 bag 采用延迟计算,所以这个小结果也会立即返回。
为了展示其深度,我们找到连接次数最多的十个客户端/服务器对。
In [8]: counts = records.pluck(['client_ip', 'server_ip']).frequencies()
In [9]: %time list(counts.topk(10, key=lambda x: x[1]))
CPU times: user 11.2 s, sys: 1.15 s, total: 12.3 s
Wall time: 50.4 s
Out[9]:
[(('247.193.34.56', '243.182.247.102'), 35353),
(('172.219.28.251', '47.61.128.1'), 22333),
(('240.97.200.0', '108.146.202.184'), 17492),
(('229.112.177.58', '47.61.128.1'), 12993),
(('146.214.34.69', '119.153.78.6'), 12554),
(('17.32.139.174', '179.135.20.36'), 10166),
(('97.166.76.88', '65.81.49.125'), 8155),
(('55.156.159.21', '157.229.248.255'), 7533),
(('55.156.159.21', '124.77.75.86'), 7506),
(('55.156.159.21', '97.5.181.76'), 7501)]
与 Spark 的比较
首先,将 PySpark 在本地运行与此进行比较既愚蠢又不公平。PySpark 在分布式环境中提供了更多功能。
In [1]: import pyspark
In [2]: sc = pyspark.SparkContext('local')
In [3]: from glob import glob
In [4]: filenames = sorted(glob('UCB-home-*.log'))
In [5]: rdd = sc.parallelize(filenames, numSlices=4)
In [6]: import re
In [7]: pattern = ...
In [8]: prog = re.compile(pattern)
In [9]: lines = rdd.flatMap(lambda fn: list(open(fn)))
In [10]: records = lines.map(lambda line: prog.match(line).groupdict())
In [11]: ips = records.map(lambda rec: (rec['client_ip'], rec['server_ip']))
In [12]: from toolz import topk
In [13]: %time dict(topk(10, ips.countByValue().items(), key=1))
CPU times: user 1.32 s, sys: 52.2 ms, total: 1.37 s
Wall time: 1min 21s
Out[13]:
{('146.214.34.69', '119.153.78.6'): 12554,
('17.32.139.174', '179.135.20.36'): 10166,
('172.219.28.251', '47.61.128.1'): 22333,
('229.112.177.58', '47.61.128.1'): 12993,
('240.97.200.0', '108.146.202.184'): 17492,
('247.193.34.56', '243.182.247.102'): 35353,
('55.156.159.21', '124.77.75.86'): 7506,
('55.156.159.21', '157.229.248.255'): 7533,
('55.156.159.21', '97.5.181.76'): 7501,
('97.166.76.88', '65.81.49.125'): 8155}
因此,在一台机器上执行计算密集型且大部分是尴尬并行(正则表达式相对昂贵)的任务时,它们是可比的。
您想要使用 Spark 的原因
- 您想要使用许多机器并与 HDFS 交互
- 混洗操作
您想要使用 dask.bag 的原因
- 安装简单
- 无需处理 JVM 堆大小或配置文件
- 良好的错误报告。Spark 的错误报告包含任何 JVM 解决方案都会有的典型巨型 Java 堆栈跟踪。
- 对于 Python 程序员来说,更容易/更简单地进行开发。其实现代码包括注释在内只有350行。
再说一次,这真的只是一个玩具实验,目的是展示 dask 模型不仅仅是关于数组的。我绝对不想把 Dask 和 Spark 相提并论。
结论
然而,我确实想强调单机并行性的重要性。Dask.bag 很好地针对了这一应用场景,并以一种自然且易于大多数 Python 程序员理解的方式利用了常用硬件。
熟练的开发者可以扩展此功能以使其在分布式内存环境中工作。创建任务依赖图的逻辑与调度器是分开的。
特别感谢 Erik Welch 精心设计了 dask 的优化流程,确保数据流畅。
博客评论由 Disqus 提供支持