这项工作得到了 Continuum AnalyticsXDATA 项目的支持,是 Blaze 项目的一部分

tl;dr: 我们使用纯 Python 项目在集群上分析 JSON 数据。

Dask,一个用于并行计算的 Python 库,现在可以在集群上运行了。在过去的几个月里,我和其他人通过一个新的分布式内存调度器扩展了 dask。这使得 dask 现有的并行算法能够扩展到数十到数百个节点,并将 PyData 的一个子集扩展到分布式计算。在接下来的几周里,我和其他人将撰写关于这个系统的文章。请注意,dask+distributed 正在快速发展,因此 API 可能会有所变化。

今天我们从一个典型的集群计算问题开始,使用 dask.bag 和新的分布式调度器来解析 JSON 记录、过滤和计数事件。我们将在未来的文章中深入探讨更高级的问题。

这篇博文的视频版本可在此处观看:此处

S3 上的 GitHub Archive 数据

GitHub 将其公共事件流数据作为 gzipped 压缩的、行分隔的 JSON 文件发布。这些数据量很大,即使在较大的工作站上也难以轻松地完全载入内存。我们可以从磁盘流式读取,但由于压缩和 JSON 编码,这需要一些时间,从而拖慢了交互使用。对于处理这类数据的交互体验,我们需要一个分布式集群。

设置和数据

我们在 EC2 上配置了九个 m3.2xlarge 节点。每个节点有八个核心和 30GB 内存。在这个集群上,我们配置了一个调度器和九个工作节点(参见设置文档)。(关于启动的更多内容将在后面的文章中介绍。)我们在 S3 的 githubarchive-data 存储桶中有五个月的数据,从 2015-01-01 到 2015-05-31。如果您想在 EC2 上使用这些数据,它们是公开可用的。您可以在 https://www.githubarchive.org/ 下载完整数据集。

第一条记录如下所示

 {'actor': {'avatar_url': 'https://avatars.githubusercontent.com/u/9152315?',
   'gravatar_id': '',
   'id': 9152315,
   'login': 'davidjhulse',
   'url': 'https://api.github.com/users/davidjhulse'},
  'created_at': '2015-01-01T00:00:00Z',
  'id': '2489368070',
  'payload': {'before': '86ffa724b4d70fce46e760f8cc080f5ec3d7d85f',
   'commits': [{'author': {'email': '[email protected]',
      'name': 'davidjhulse'},
     'distinct': True,
     'message': 'Altered BingBot.jar\n\nFixed issue with multiple account support',
     'sha': 'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
     'url': 'https://api.github.com/repos/davidjhulse/davesbingrewardsbot/commits/a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81'}],
   'distinct_size': 1,
   'head': 'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
   'push_id': 536740396,
   'ref': 'refs/heads/master',
   'size': 1},
  'public': True,
  'repo': {'id': 28635890,
   'name': 'davidjhulse/davesbingrewardsbot',
   'url': 'https://api.github.com/repos/davidjhulse/davesbingrewardsbot'},
  'type': 'PushEvent'}

所以我们在 S3 上有一个大型数据集,并且在 EC2 上有一个中等规模的 play 集群,该集群可以以每节点约 100MB/秒的速度访问 S3 数据。我们准备好了。

因此,我们在 S3 上有一个大型数据集,在 EC2 上有一个中等规模的实验集群,每个节点能够以大约 100MB/s 的速度访问 S3 数据。我们准备开始实验了。

实验

>>> from distributed import Executor, s3
>>> e = Executor('54.173.84.107:8786')
>>> e
<Executor: scheduler=54.173.84.107:8786 workers=72 threads=72>

我们在本地笔记本电脑上启动一个 ipython 解释器,并连接到集群上运行的 dask 调度器。为了计时,集群位于东海岸,而本地机器位于加利福尼亚,使用商业宽带互联网。

我们的七十二个工作进程来自九个工作节点,每个节点有八个进程。我们选择进程而不是线程来完成这项任务,因为计算会受到 GIL 的限制。在后面的例子中,我们将更改为线程。

import json
text = s3.read_text('githubarchive-data', '2015-01', compression='gzip')
records = text.map(json.loads)
records = e.persist(records)

我们首先将一个月的数据加载到分布式内存中。

数据以每小时文件的形式存储在 S3 中,格式为 gzipped 编码、行分隔的 JSON。 s3.read_texttext.map 函数会生成 dask.bag 对象,这些对象在惰性构建的任务图中跟踪我们的操作。当我们要求执行器 persist 这个集合时,我们将这些任务发送到调度器,让它们在所有工作节点上并行运行。persist 函数会返回另一个指向远程运行结果的 dask.bag 对象。这个 persist 函数会立即返回,计算则在集群后台异步进行。我们立即恢复了对解释器的控制,而集群则在后台持续工作。

集群下载、解压缩和解析这些数据大约需要 40 秒。如果您观看上面嵌入的视频,您会看到精美的进度条。

>>> records.take(1)
({'actor': {'avatar_url': 'https://avatars.githubusercontent.com/u/9152315?',
   'gravatar_id': '',
   'id': 9152315,
   'login': 'davidjhulse',
   'url': 'https://api.github.com/users/davidjhulse'},
  'created_at': '2015-01-01T00:00:00Z',
  'id': '2489368070',
  'payload': {'before': '86ffa724b4d70fce46e760f8cc080f5ec3d7d85f',
   'commits': [{'author': {'email': '[email protected]',
      'name': 'davidjhulse'},
     'distinct': True,
     'message': 'Altered BingBot.jar\n\nFixed issue with multiple account support',
     'sha': 'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
     'url': 'https://api.github.com/repos/davidjhulse/davesbingrewardsbot/commits/a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81'}],
   'distinct_size': 1,
   'head': 'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
   'push_id': 536740396,
   'ref': 'refs/heads/master',
   'size': 1},
  'public': True,
  'repo': {'id': 28635890,
   'name': 'davidjhulse/davesbingrewardsbot',
   'url': 'https://api.github.com/repos/davidjhulse/davesbingrewardsbot'},
  'type': 'PushEvent'},)

我们请求一条记录。这在约 200 毫秒内返回,速度之快足以让人感觉是即时的。

>>> %time records.pluck('type').frequencies().compute()
CPU times: user 112 ms, sys: 0 ns, total: 112 ms
Wall time: 2.41 s

[('ReleaseEvent', 44312),
 ('MemberEvent', 69757),
 ('IssuesEvent', 693363),
 ('PublicEvent', 14614),
 ('CreateEvent', 1651300),
 ('PullRequestReviewCommentEvent', 214288),
 ('PullRequestEvent', 680879),
 ('ForkEvent', 491256),
 ('DeleteEvent', 256987),
 ('PushEvent', 7028566),
 ('IssueCommentEvent', 1322509),
 ('GollumEvent', 150861),
 ('CommitCommentEvent', 96468),
 ('WatchEvent', 1321546)]

这个特定的事件是一个 'PushEvent'。让我们快速看看所有类型的事件。为了好玩,我们还会计时这次交互

>>> %time records.count().compute()
CPU times: user 134 ms, sys: 133 µs, total: 134 ms
Wall time: 1.49 s

14036706

然后我们计算这个月的总提交次数。

我们看到遍历数据(并执行所有调度开销)需要几秒钟。调度器为每个任务增加大约一毫秒的开销,这里大约有 1000 个分区/文件(GitHub 数据按小时分割,一个月有 730 小时),所以大部分成本都是开销。

调查 Jupyter

>>> jupyter = (records.filter(lambda d: d['repo']['name'].startswith('jupyter/'))
                      .repartition(10))
>>> jupyter = e.persist(jupyter)

我们调查 Project Jupyter 的活动。我们选择这个项目是因为它规模较大,而且我们了解其中涉及的人员,因此可以检查我们的准确性。这将要求我们将数据过滤到一个小得多的子集,然后找出受欢迎的仓库和成员。

>>> %time jupyter.count().compute()
CPU times: user 5.19 ms, sys: 97 µs, total: 5.28 ms
Wall time: 199 ms

747

>>> %time jupyter.take(1)
CPU times: user 7.01 ms, sys: 259 µs, total: 7.27 ms
Wall time: 182 ms

({'actor': {'avatar_url': 'https://avatars.githubusercontent.com/u/26679?',
   'gravatar_id': '',
   'id': 26679,
   'login': 'marksteve',
   'url': 'https://api.github.com/users/marksteve'},
  'created_at': '2015-01-01T13:25:44Z',
  'id': '2489612400',
  'org': {'avatar_url': 'https://avatars.githubusercontent.com/u/7388996?',
   'gravatar_id': '',
   'id': 7388996,
   'login': 'jupyter',
   'url': 'https://api.github.com/orgs/jupyter'},
  'payload': {'action': 'started'},
  'public': True,
  'repo': {'id': 5303123,
   'name': 'jupyter/nbviewer',
   'url': 'https://api.github.com/repos/jupyter/nbviewer'},
  'type': 'WatchEvent'},)

所有记录,无论事件类型如何,都有一个仓库,其名称通常遵循 GitHub 的 'organization/repository' 格式。我们过滤所有以 'jupyter/' 开头的记录。此外,由于这个数据集可能小得多,我们将所有这些记录推送到仅十个分区中。这显著减少了调度开销。persist 调用将此计算任务交给调度器,然后返回我们的集合,该集合指向计算结果。过滤本月的 Jupyter 事件大约需要 7.5 秒。之后对这个子集进行计算感觉非常流畅。

因此,今年的第一个事件是由 'marksteve' 触发的,他在元旦决定关注 'nbviewer' 仓库。

请注意,这些计算大约需要 200 毫秒。从我的本地机器上无法低于这个时间,因此我们可能受限于与如此远的远程位置通信。如果您在玩电子游戏,200 毫秒的延迟不算好,但对于交互式计算来说还不错。

>>> %time jupyter.pluck('repo').pluck('name').distinct().compute()
CPU times: user 2.84 ms, sys: 4.03 ms, total: 6.86 ms
Wall time: 204 ms

['jupyter/dockerspawner',
 'jupyter/design',
 'jupyter/docker-demo-images',
 'jupyter/jupyterhub',
 'jupyter/configurable-http-proxy',
 'jupyter/nbshot',
 'jupyter/sudospawner',
 'jupyter/colaboratory',
 'jupyter/strata-sv-2015-tutorial',
 'jupyter/tmpnb-deploy',
 'jupyter/nature-demo',
 'jupyter/nbcache',
 'jupyter/jupyter.github.io',
 'jupyter/try.jupyter.org',
 'jupyter/jupyter-drive',
 'jupyter/tmpnb',
 'jupyter/tmpnb-redirector',
 'jupyter/nbgrader',
 'jupyter/nbindex',
 'jupyter/nbviewer',
 'jupyter/oauthenticator']

以下是 1 月份涉及的所有 Jupyter 仓库,

>>> %time (jupyter.pluck('actor')
                  .pluck('login')
                  .frequencies()
                  .topk(10, lambda kv: kv[1])
                  .compute())
CPU times: user 8.03 ms, sys: 90 µs, total: 8.12 ms
Wall time: 226 ms

[('rgbkrk', 156),
 ('minrk', 87),
 ('Carreau', 87),
 ('KesterTong', 74),
 ('jhamrick', 70),
 ('bollwyvl', 25),
 ('pkt', 18),
 ('ssanderson', 13),
 ('smashwilson', 13),
 ('ellisonbg', 13)]

以及 GitHub 上最活跃的十个人。

如果您了解这些人的话,这里没有什么太令人惊讶的。

完整数据集

完整五个月的数据量过大,即使对于这个集群来说也无法完全载入内存。当我们使用列表和字典等动态数据结构来表示这样的半结构化数据时,会存在相当大的内存膨胀。如果在此处仔细注意高效的半结构化存储,可以避免我们不得不切换到如此大的集群,但这将是另一篇文章的主题。

相反,我们通过在内存中流动数据,并仅持久化我们关心的记录,来高效地处理这个数据集。分布式 dask 调度器继承自单机 dask 调度器,后者非常擅长在计算过程中流动数据并智能地移除中间结果。

>>> full = (s3.read_text('githubarchive-data', '2015', compression='gzip')
              .map(json.loads)

>>> jupyter = (full.filter(lambda d: d['repo']['name'].startswith('jupyter/'))
                   .repartition(10))

>>> jupyter = e.persist(jupyter)

从用户 API 的角度来看,我们只对 jupyter 数据集调用 persist,而不是对完整的 records 数据集。

在九个 m3.2xlarge 实例上下载、解压缩和解析这五个月的公开可用 GitHub 事件(所有 Jupyter 事件)需要 2 分 36 秒。

>>> jupyter.count().compute()
7065

总共有七千个这样的事件。

>>> %time (jupyter.pluck('repo')
                  .pluck('name')
                  .frequencies()
                  .topk(20, lambda kv: kv[1])
                  .compute())
CPU times: user 6.98 ms, sys: 474 µs, total: 7.46 ms
Wall time: 219 ms

[('jupyter/jupyterhub', 1262),
 ('jupyter/nbgrader', 1235),
 ('jupyter/nbviewer', 846),
 ('jupyter/jupyter_notebook', 507),
 ('jupyter/jupyter-drive', 505),
 ('jupyter/notebook', 451),
 ('jupyter/docker-demo-images', 363),
 ('jupyter/tmpnb', 284),
 ('jupyter/jupyter_client', 162),
 ('jupyter/dockerspawner', 149),
 ('jupyter/colaboratory', 134),
 ('jupyter/jupyter_core', 127),
 ('jupyter/strata-sv-2015-tutorial', 108),
 ('jupyter/jupyter_nbconvert', 103),
 ('jupyter/configurable-http-proxy', 89),
 ('jupyter/hubpress.io', 85),
 ('jupyter/jupyter.github.io', 84),
 ('jupyter/tmpnb-deploy', 76),
 ('jupyter/nbconvert', 66),
 ('jupyter/jupyter_qtconsole', 59)]

我们找出在此期间哪些仓库活动最频繁

我们看到像 jupyterhub 这样的项目在那段时间非常活跃,而令人惊讶的是,nbconvert 的活动相对较少。

本地数据

>>> %time L = jupyter.compute()
CPU times: user 4.74 s, sys: 10.9 s, total: 15.7 s
Wall time: 30.2 s

Jupyter 数据非常小,很容易容纳在单台机器中。让我们将数据转移到本地机器上,以便比较时间

>>> from toolz.curried import pluck, frequencies, topk, pipe
>>> %time pipe(L, pluck('repo'), pluck('name'), frequencies,
               dict.items, topk(20, key=lambda kv: kv[1]), list)
CPU times: user 11.8 ms, sys: 0 ns, total: 11.8 ms
Wall time: 11.5 ms

[('jupyter/jupyterhub', 1262),
 ('jupyter/nbgrader', 1235),
 ('jupyter/nbviewer', 846),
 ('jupyter/jupyter_notebook', 507),
 ('jupyter/jupyter-drive', 505),
 ('jupyter/notebook', 451),
 ('jupyter/docker-demo-images', 363),
 ('jupyter/tmpnb', 284),
 ('jupyter/jupyter_client', 162),
 ('jupyter/dockerspawner', 149),
 ('jupyter/colaboratory', 134),
 ('jupyter/jupyter_core', 127),
 ('jupyter/strata-sv-2015-tutorial', 108),
 ('jupyter/jupyter_nbconvert', 103),
 ('jupyter/configurable-http-proxy', 89),
 ('jupyter/hubpress.io', 85),
 ('jupyter/jupyter.github.io', 84),
 ('jupyter/tmpnb-deploy', 76),
 ('jupyter/nbconvert', 66),
 ('jupyter/jupyter_qtconsole', 59)]

下载数据出人意料地耗时,但一旦数据到达本地,我们就可以使用基本的 Python 更快地进行迭代。

这里的差异是 20 倍,这很好地提醒我们,一旦你不再处理大型问题,最好放弃分布式系统,在本地进行操作。

结论

下载、解压缩、解析、过滤和计数 JSON 记录是新的词频统计问题。这是任何人都会遇到的第一个问题。幸运的是,它既容易解决又是常见情况。太棒了!

在这里,我们看到 dask+distributed 能够很好地处理常见情况,而且使用纯 Python 技术栈。通常,Python 用户依赖于 Hadoop/Spark/Storm 等 JVM 技术来分发他们的计算。在这里,我们用 Python 分发 Python;这带来了一些可用性上的优势,比如清晰的堆栈跟踪、较低的序列化开销,以及对其他 Pythonic 风格选择的关注。

在接下来的几篇文章中,我打算偏离这个常见情况。大多数“大数据”技术都是为了解决网络公司中遇到的典型数据整理问题,或者考虑到简单的数据库操作而设计的。Python 用户也关心这些事情,但他们的应用范围也很广。在 dask+distributed 的开发中,我们关注常见情况,但也支持在生命科学、物理学和算法科学中常见的非传统工作流程。

通过设计来支持这些更极端的情况,我们解决了一些当前分布式系统中的常见痛点。今天我们看到了低延迟和远程控制;未来我们将看到更多。

存在的问题

  • 在接下来每篇文章的结尾,我都会有一个类似的坦诚部分,描述存在的问题、仍然感觉不完善的地方,或者如果时间充裕我会如何做得不同。

  • dask 和 distributed 的导入方式仍然有些奇怪。它们是两个独立的代码库,但配合得非常好。不幸的是,您需要的功能有时在这个库里,有时在那个库里,初学者用户不太清楚应该去哪里找。例如,我们用于 recordsjupyter 等的集合 dask.bag 在 dask 中,而 s3 模块则在 distributed 库中。在不远的将来,我们需要将它们合并。API 也是如此:dask 集合(records.compute())和分布式执行器(e.compute(records))都有 compute 方法,它们的行为略有不同。

  • 我们缺乏高效的分布式 shuffle 算法。如果您想使用像 .groupby 这样的操作(无论如何您都应该避免使用),这是非常重要的。这里的用户 API 甚至没有清楚地警告用户在分布式情况下缺少此功能,这有点混乱。(在单机上运行良好。)foldby 等高效替代方法可用的。

  • 链接
  • dask,原始项目
  • dask.distributed,为集群计算提供支持的分布式内存调度器
  • dask.bag,本文中使用的用户 API。

请启用 JavaScript 查看 由 Disqus 提供的评论。