Dask 与 Celery
本文比较了两个 Python 分布式任务处理系统:Dask.distributed 和 Celery。
免责声明:技术比较很难做得好。我偏向 Dask,对正确的 Celery 实践不够了解。请牢记这一点。欢迎 Celery 专家的批评指正。
Celery 是一个用 Python 构建的分布式任务队列,被 Python 社区广泛用于基于任务的工作负载。
Dask 是一个在 PyData 社区流行的并行计算库,它发展出了一个相当复杂的分布式任务调度器。本文探讨了 Dask.distributed 是否可用于 Celery 式的问题。
比较技术项目很难,原因在于作者有偏见,同时每个项目的范围都可能相当大。这使得作者会倾向于展示自身优势的特性。幸运的是,一位 Celery 用户在 Github 上询问了 Dask 的比较情况,他们列举了一些具体的特性
- 处理多个队列
- Canvas (Celery 的工作流)
- 速率限制
- 重试
这些特性提供了一个机会,可以从 Celery 用户的角度而不是 Dask 开发者的角度来探讨 Dask/Celery 的比较。
在本文中,我将指出几个主要区别,然后分别介绍两个项目中的 Celery hello world,然后讨论 Dask 中如何实现或未实现这些被要求的特性。这种基于几个特性的 anecdotal 比较应该能给我们一个大致的对比。
最大区别:Worker 状态和通信
首先,最大的区别(从我的角度来看)是 Dask worker 会保留中间结果并在 worker 之间互相通信,而在 Celery 中,所有结果都流回中心权威。这种差异在构建大型并行数组和 dataframes(Dask 的最初目的)时至关重要,因为我们需要利用 worker 进程的内存和 worker 间通信带宽。像 Dask 这样的计算系统会这样做,而像 Celery/Airflow/Luigi 这样的数据工程系统则不会。这是 Dask 最初未构建在 Celery/Airflow/Luigi 之上的主要原因。
这绝不是对 Celery/Airflow/Luigi 的贬低。通常它们用于这种差异无关紧要的场景,并将精力集中在 Dask 同样不关心或不擅长的几个特性上。任务通常从某种全局可访问的存储中读取数据,例如数据库或 S3,要么返回非常小的结果,要么将较大的结果放回全局存储中。
现在我脑海中的问题是:在 Celery 等项目通常用于的更传统的松散任务调度问题中,Dask 是否能成为一个有用的解决方案?其优点和缺点是什么?
Hello World
首先,我们将介绍Celery 入门第一步,分别在 Celery 和 Dask 中执行,并进行比较
Celery
我按照 Celery 快速入门指南进行,使用 Redis 而不是 RabbitMQ,因为手边正好有 Redis。
# tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost', backend='redis')
@app.task
def add(x, y):
return x + y
redis-server
celery -A tasks worker --loglevel=info
In [1]: from tasks import add
In [2]: %time add.delay(1, 1).get() # submit and retrieve roundtrip
CPU times: user 60 ms, sys: 8 ms, total: 68 ms
Wall time: 567 ms
Out[2]: 2
In [3]: %%time
...: futures = [add.delay(i, i) for i in range(1000)]
...: results = [f.get() for f in futures]
...:
CPU times: user 888 ms, sys: 72 ms, total: 960 ms
Wall time: 1.7 s
Dask
我们使用 dask.distributed 的 concurrent.futures 接口执行相同的工作负载,使用默认的单机部署。
In [1]: from distributed import Client
In [2]: c = Client()
In [3]: from operator import add
In [4]: %time c.submit(add, 1, 1).result()
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 20.7 ms
Out[4]: 2
In [5]: %%time
...: futures = [c.submit(add, i, i) for i in range(1000)]
...: results = c.gather(futures)
...:
CPU times: user 328 ms, sys: 12 ms, total: 340 ms
Wall time: 369 ms
比较
- 函数:在 Celery 中,您需要在服务器端提前注册计算任务。如果您提前知道要运行什么(例如数据工程工作负载中通常如此),并且不希望允许用户在集群上运行任意代码带来安全风险,这是很好的。对想要实验的用户来说不太方便。在 Dask 中,我们在用户端选择要运行的函数,而不是在服务器端。这在数据探索中非常关键,但在更保守/安全的计算环境中可能会成为障碍。
- 设置:在 Celery 中,我们依赖其他广泛部署的系统,如 RabbitMQ 或 Redis。Dask 依赖更底层的 Tornado TCP IOStreams 和 Dask 自己的定制路由逻辑。这使得 Dask 的设置变得微不足道,但也可能不那么健壮。Redis 和 RabbitMQ 都解决了许多实践中遇到的问题,依赖它们能带来信心。
- 性能:它们都能在亚秒级延迟和毫秒级开销下运行。Dask 的开销略低,但对于数据工程工作负载,这个级别的差异通常不重要。Dask 的延迟低一个数量级,这取决于您的应用,可能非常重要。例如,如果您从用户点击网站上的按钮触发任务,20 毫秒通常在交互预算内,而 500 毫秒则感觉有点慢。
简单依赖
这个问题询问了Canvas,即 Celery 的依赖管理系统。
任务通常依赖于其他任务的结果。两个系统都有帮助用户表达这些依赖关系的方式。
Celery
apply_async
方法有一个 link=
参数,可用于在其他任务运行后调用任务。例如,我们可以在 Celery 中如下计算 (1 + 2) + 3
add.apply_async((1, 2), link=add.s(3))
Dask.distributed
使用 Dask 的 concurrent.futures API,在 submit 调用中可以使用 futures,依赖关系是隐式的。
x = c.submit(add, 1, 2)
y = c.submit(add, x, 3)
我们也可以使用dask.delayed 修饰器来修饰任意函数,然后使用接近普通 Python 的方式。
@dask.delayed
def add(x, y):
return x + y
x = add(1, 2)
y = add(x, 3)
y.compute()
比较
我更喜欢 Dask 的解决方案,但这很主观。
复杂依赖
Celery
Celery 包含丰富的术语,用于以更复杂的方式连接任务,包括 groups
, chains
, chords
, maps
, starmaps
等。更多细节请参考其 Canvas 文档(Canvas 是他们用来构建复杂工作流的系统):http://docs.celeryproject.org/en/master/userguide/canvas.html
例如,这里我们 chord 多个 adds,然后用一个 sum 跟随它们。
In [1]: from tasks import add, tsum # I had to add a sum method to tasks.py
In [2]: from celery import chord
In [3]: %time chord(add.s(i, i) for i in range(100))(tsum.s()).get()
CPU times: user 172 ms, sys: 12 ms, total: 184 ms
Wall time: 1.21 s
Out[3]: 9900
Dask
Dask 允许在 submit 调用中使用 futures 的技巧实际上非常强大。Dask 实际上不需要任何额外的原语。它可以使用普通的 submit 调用非常自然地实现 Canvas 中表达的所有模式。
In [4]: %%time
...: futures = [c.submit(add, i, i) for i in range(100)]
...: total = c.submit(sum, futures)
...: total.result()
...:
CPU times: user 52 ms, sys: 0 ns, total: 52 ms
Wall time: 60.8 ms
或者使用Dask.delayed
futures = [add(i, i) for i in range(100)]
total = dask.delayed(sum)(futures)
total.result()
多个队列
在 Celery 中,有队列的概念,任务可以提交到队列,并且 worker 可以订阅这些队列。一个示例用例是拥有只处理“高优先级”任务的“高优先级”worker。每个 worker 都可以订阅高优先级队列,但某些 worker 会独占性地订阅该队列。
celery -A my-project worker -Q high-priority # only subscribe to high priority
celery -A my-project worker -Q celery,high-priority # subscribe to both
celery -A my-project worker -Q celery,high-priority
celery -A my-project worker -Q celery,high-priority
这就像 TSA 快速安检通道或杂货店的快速结账通道。
Dask 有几个类似的概念,或者必要时可以满足这种需求,但没有完全对应的事物。
首先,对于上述常见情况,任务有优先级。这些优先级通常由调度器设置以最小化内存使用,但用户可以直接覆盖,使某些任务优先于其他任务执行。
其次,您可以将任务限制在部分 worker 上运行。这最初是为 Hadoop FileSystem (HDFS) 等数据本地存储系统或带有 GPU 等特殊硬件的集群设计的,但也可以用于队列场景。这抽象层面不太一样,但必要时可以达到相同的效果。对于每个任务,您可以限制它可以运行的 worker 池。
相关的文档在这里:https://distributed.dask.org.cn/en/latest/locality.html#user-control
任务重试
Celery 允许任务在失败时自行重试。
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
# Example from http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
遗憾的是,Dask 目前不支持此功能(参见未解决的问题)。所有函数都被认为是纯粹的且最终的。如果任务出错,异常将被视为真正结果。不过这可能会改变;这已经被请求过几次了。
在此之前,用户需要在函数内部实现重试逻辑(无论如何,这也不是一个糟糕的主意)。
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet, n_retries=5):
for i in range(n_retries):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
return
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
pass
速率限制
Celery 允许您为任务指定速率限制,大概是为了帮助您避免因频繁调用外部 API 而被阻止。
@app.task(rate_limit='1000/h')
def query_external_api(...):
...
Dask 肯定没有内置此功能,也没有计划添加。但是,可以在 Dask 外部相当容易地实现。例如,Dask 支持在任意 Python 队列上映射函数。如果您传入一个队列,那么该队列中所有当前和未来的元素都将被映射处理。您可以在客户端使用纯 Python 轻松处理速率限制,方法是限制输入队列的速率。Dask 低延迟和低开销使得在客户端管理这种逻辑相当容易。虽然不如内置方便,但仍然很直接。
>>> from queue import Queue
>>> q = Queue()
>>> out = c.map(query_external_api, q)
>>> type(out)
Queue
最终想法
基于对 Celery 的非常浅显的探索,我冒昧地断言 Dask 可以处理 Celery 的工作负载,如果您不深入其高级 API 的话。然而,所有这些高级 API 实际上都非常重要。Celery 在这个领域演化,并开发了大量特性,解决了反复出现的问题。这段历史为用户节省了大量时间。Dask 在一个截然不同的领域演化,并发展了一套截然不同的技巧。Dask 的许多技巧足够通用,只需稍加努力就能解决 Celery 问题,但仍需要额外的步骤。我看到人们正在将这种努力应用于解决问题,我认为看看会产生什么结果将很有趣。
亲自体验 Celery API 对我来说是一次很好的经历。我认为 Celery 中有一些好的概念可以为未来的 Dask 开发提供启发。
博客评论由 Disqus 提供