这项工作由 Continuum AnalyticsXDATA Program 作为 Blaze Project 的一部分提供支持。

太长不看 我们使用 dask 来构建一个并行 Python 列表。

引言

这是使用 NumPy 和 dask 构建核外 nd-array 系列文章中的第七篇。您可以在此处查看这些文章

  1. 简单的任务调度,
  2. 前端可用性
  3. 一个多线程调度器
  4. 矩阵乘法基准测试
  5. 溢出到磁盘
  6. 切片与堆叠

今天我们暂时不讨论 ND 数组,而是展示任务调度如何处理其他集合,例如简单的 Python 对象 list

非结构化数据

通常在我们拥有 ndarraytable/DataFrame 之前,我们会有像日志文件这样的非结构化数据。在这种情况下,经过调整的语言子集(例如 numpy/pandas)是不够的,我们需要完整的 Python 语言。

我处理那些不方便处理的大量日志文件的常用方法是使用 Python 流式迭代器,并在一台大型机器上结合使用 多进程或 IPython Parallel。在讨论 toolz 时,我经常撰写文章或发表演讲来介绍这个工作流程。

当引入许多进程时,这种工作流程对于大多数用户来说会变得复杂。为了解决这个问题,我们将我们常用的技巧构建到一个新的 dask.Bag 集合中。

Bag

正如 dask.array 模仿 NumPy 操作(例如矩阵乘法、切片)一样,dask.bag 也模仿标准库中找到的函数式操作,如 mapfilterreduce,以及 toolz 中找到的许多流式函数。

  • Dask array = NumPy + 线程
  • Dask bag = Python/Toolz + 进程

示例

这是必不可少的词频统计示例

>>> from dask.bag import Bag

>>> b = Bag.from_filenames('data/*.txt')

>>> def stem(word):
...     """ Stem word to primitive form """
...     return word.lower().rstrip(",.!:;'-\"").lstrip("'\"")

>>> dict(b.map(str.split).map(concat).map(stem).frequencies())
{...}

我们使用所有的核心,并在每个核心上通过内存进行流式处理。我们使用 multiprocessing,但经过一些工作可以做得更高级。

有许多更大、更强大的系统也具有类似的 API,特别是 SparkDPark。如果您有大数据并且需要使用非常多的机器,那么您应该停止阅读本文并去安装它们。

我主要创建 dask.bag 是因为

  1. 鉴于 dask.array 上已经完成的工作,创建它非常容易
  2. 我通常只需要多进程 + 一台高性能机器
  3. 我想要一个易于通过 pip 安装且不使用 JVM 的东西

但再说一遍,如果您有大数据,那么这不适合您。

设计

和之前一样,一个 Bag 只是一个持有任务的字典,外加一些元数据。

>>> d = {('x', 0): (range, 5),
...      ('x', 1): (range, 5),
...      ('x', 2): (range, 5)}

>>> from dask.bag import Bag
>>> b = Bag(d, 'x', npartitions=3)

通过这种方式,我们将一个集合分解为

[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]

三个独立的部分

[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]

当我们抽象地操作大型集合时...

>>> b2 = b.map(lambda x: x * 10)

... 我们会生成新的任务来操作每个组件。

>>> b2.dask
{('x', 0): (range, 5),
 ('x', 1): (range, 5),
 ('x', 2): (range, 5)}
 ('bag-1', 0): (map, lambda x: x * 10, ('x', 0)),
 ('bag-1', 1): (map, lambda x: x * 10, ('x', 1)),
 ('bag-1', 2): (map, lambda x: x * 10, ('x', 2))}

当我们只需要具体结果(调用 list)时,我们会启动一个调度器来执行生成的任务依赖图。

>>> list(b2)
[0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 0, 10, 20, 30, 40]

更复杂的操作会产生更复杂的 dasks。请注意,dask 代码相当“Lisp 式”。幸运的是,这些 dasks 是内部的;用户无需与之交互。

>>> iseven = lambda x: x % 2 == 0
>>> b3 = b.filter(iseven).count().dask
{'bag-3': (sum, [('bag-2', 1), ('bag-2', 2), ('bag-2', 0)]),
 ('bag-2', 0): (count,
                (filter, iseven, (range, 5))),
 ('bag-2', 1): (count,
                (filter, iseven, (range, 5))),
 ('bag-2', 2): (count,
                (filter, iseven, (range, 5)))}

Bag 当前的接口提供以下操作

all             frequencies         min
any             join                product
count           map                 std
filter          map_partitions      sum
fold            max                 topk
foldby          mean                var

对 bag 的操作会创建任务依赖图。我们最终会并行执行这些图。

执行

我们重新利用用于数组的线程调度器来支持 multiprocessing,以便即使在纯 Python 代码上也能提供并行性。我们注意避免不必要的数据传输。上面列出的操作都不需要大量的通信。值得注意的是,我们没有任何混洗或散列/聚集的概念。

我们使用 dill 来确保正确序列化函数并收集/报告错误,这两个问题困扰着 Python 中对 multiprocessing 的简单使用

>>> list(b.map(lambda x: x * 10))  # This works!
[0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 0, 10, 20, 30, 40]

>>> list(b.map(lambda x: x / 0))   # This errs gracefully!
ZeroDivisionError:  Execption in remote Process

integer division or modulo by zero

Traceback:
    ...

这些技巧消除了对用户专业知识的需求。

高效的最佳点

我认为以下配置中存在一个高效的最佳点

  1. 纯 Python 函数
  2. 流式/延迟计算数据
  3. 多进程
  4. 一台大型机器或非正式集群中的几台机器

这种设置很常见,能够处理TB级的工作流程。根据我的经验,人们很少采用这种方式。他们会使用单线程内存中的 Python,直到它崩溃,然后寻求 Hadoop/Spark 等大数据基础设施,但这样做会带来相对较高的生产力开销。

您的工作站的扩展能力可能超出您的想象。

示例

这里有大约1GB的网络流数据,记录了1996年UC伯克利校区内哪些计算机连接了哪些其他计算机。

846890339:661920 846890339:755475 846890340:197141 168.237.7.10:1163 83.153.38.208:80 2 8 4294967295 4294967295 846615753 176 2462 39 GET 21068906053917068819..html HTTP/1.0

846890340:989181 846890341:2147 846890341:2268 13.35.251.117:1269 207.83.232.163:80 10 0 842099997 4294967295 4294967295 64 1 38 GET 20271810743860818265..gif HTTP/1.0

846890341:80714 846890341:90331 846890341:90451 13.35.251.117:1270 207.83.232.163:80 10 0 842099995 4294967295 4294967295 64 1 38 GET 38127854093537985420..gif HTTP/1.0

这实际上相对干净。许多字段是空格分隔的(不是全部),而且我已经编译并运行了用于从原始格式解压数据的十年旧 C 代码。

让我们使用 Bag 和正则表达式来解析它。

In [1]: from dask.bag import Bag, into

In [2]: b = Bag.from_filenames('UCB-home-IP*.log')

In [3]: import re

In [4]: pattern = """
   ...: (?P<request_time>\d+:\d+)
   ...: (?P<response_start>\d+:\d+)
   ...: (?P<response_end>\d+:\d+)
   ...: (?P<client_ip>\d+\.\d+\.\d+\.\d+):(?P<client_port>\d+)
   ...: (?P<server_ip>\d+\.\d+\.\d+\.\d+):(?P<server_port>\d+)
   ...: (?P<client_headers>\d+)
   ...: (?P<server_headers>\d+)
   ...: (?P<if_modified_since>\d+)
   ...: (?P<response_header_length>\d+)
   ...: (?P<response_data_length>\d+)
   ...: (?P<request_url_length>\d+)
   ...: (?P<expires>\d+)
   ...: (?P<last_modified>\d+)
   ...: (?P<method>\w+)
   ...: (?P<domain>\d+..)\.(?P<extension>\w*)(?P<rest_of_url>\S*)
   ...: (?P<protocol>.*)""".strip().replace('\n', '\s+')

In [5]: prog = re.compile(pattern)

In [6]: records = b.map(prog.match).map(lambda m: m.groupdict())

这会立即返回。我们只在必要时计算结果。我们通过调用 list 触发计算。

In [7]: list(records.take(1))
Out[7]:
[{'client_headers': '2',
  'client_ip': '168.237.7.10',
  'client_port': '1163',
  'domain': '21068906053917068819.',
  'expires': '2462',
  'extension': 'html',
  'if_modified_since': '4294967295',
  'last_modified': '39',
  'method': 'GET',
  'protocol': 'HTTP/1.0',
  'request_time': '846890339:661920',
  'request_url_length': '176',
  'response_data_length': '846615753',
  'response_end': '846890340:197141',
  'response_header_length': '4294967295',
  'response_start': '846890339:755475',
  'rest_of_url': '',
  'server_headers': '8',
  'server_ip': '83.153.38.208',
  'server_port': '80'}]

因为 bag 采用延迟计算,所以这个小结果也会立即返回。

为了展示其深度,我们找到连接次数最多的十个客户端/服务器对。

In [8]: counts = records.pluck(['client_ip', 'server_ip']).frequencies()

In [9]: %time list(counts.topk(10, key=lambda x: x[1]))
CPU times: user 11.2 s, sys: 1.15 s, total: 12.3 s
Wall time: 50.4 s
Out[9]:
[(('247.193.34.56', '243.182.247.102'), 35353),
 (('172.219.28.251', '47.61.128.1'), 22333),
 (('240.97.200.0', '108.146.202.184'), 17492),
 (('229.112.177.58', '47.61.128.1'), 12993),
 (('146.214.34.69', '119.153.78.6'), 12554),
 (('17.32.139.174', '179.135.20.36'), 10166),
 (('97.166.76.88', '65.81.49.125'), 8155),
 (('55.156.159.21', '157.229.248.255'), 7533),
 (('55.156.159.21', '124.77.75.86'), 7506),
 (('55.156.159.21', '97.5.181.76'), 7501)]

与 Spark 的比较

首先,将 PySpark 在本地运行与此进行比较既愚蠢又不公平。PySpark 在分布式环境中提供了更多功能。

In [1]: import pyspark

In [2]: sc = pyspark.SparkContext('local')

In [3]: from glob import glob
In [4]: filenames = sorted(glob('UCB-home-*.log'))
In [5]: rdd = sc.parallelize(filenames, numSlices=4)

In [6]: import re
In [7]: pattern = ...
In [8]: prog = re.compile(pattern)

In [9]: lines = rdd.flatMap(lambda fn: list(open(fn)))
In [10]: records = lines.map(lambda line: prog.match(line).groupdict())
In [11]: ips = records.map(lambda rec: (rec['client_ip'], rec['server_ip']))

In [12]: from toolz import topk
In [13]: %time dict(topk(10, ips.countByValue().items(), key=1))
CPU times: user 1.32 s, sys: 52.2 ms, total: 1.37 s
Wall time: 1min 21s
Out[13]:
{('146.214.34.69', '119.153.78.6'): 12554,
 ('17.32.139.174', '179.135.20.36'): 10166,
 ('172.219.28.251', '47.61.128.1'): 22333,
 ('229.112.177.58', '47.61.128.1'): 12993,
 ('240.97.200.0', '108.146.202.184'): 17492,
 ('247.193.34.56', '243.182.247.102'): 35353,
 ('55.156.159.21', '124.77.75.86'): 7506,
 ('55.156.159.21', '157.229.248.255'): 7533,
 ('55.156.159.21', '97.5.181.76'): 7501,
 ('97.166.76.88', '65.81.49.125'): 8155}

因此,在一台机器上执行计算密集型且大部分是尴尬并行(正则表达式相对昂贵)的任务时,它们是可比的。

您想要使用 Spark 的原因

  • 您想要使用许多机器并与 HDFS 交互
  • 混洗操作

您想要使用 dask.bag 的原因

  • 安装简单
  • 无需处理 JVM 堆大小或配置文件
  • 良好的错误报告。Spark 的错误报告包含任何 JVM 解决方案都会有的典型巨型 Java 堆栈跟踪。
  • 对于 Python 程序员来说,更容易/更简单地进行开发。其实现代码包括注释在内只有350行。

再说一次,这真的只是一个玩具实验,目的是展示 dask 模型不仅仅是关于数组的。我绝对不想把 Dask 和 Spark 相提并论。

结论

然而,我确实想强调单机并行性的重要性。Dask.bag 很好地针对了这一应用场景,并以一种自然且易于大多数 Python 程序员理解的方式利用了常用硬件。

熟练的开发者可以扩展此功能以使其在分布式内存环境中工作。创建任务依赖图的逻辑与调度器是分开的。

特别感谢 Erik Welch 精心设计了 dask 的优化流程,确保数据流畅。


博客评论由 Disqus 提供支持