Dask 2.2.0 版本发布
我很高兴宣布 Dask 2.2 版本发布了。这是一个重要的版本,包含错误修复和新特性。上一篇博客发布的版本是 2019-06-22 的 2.0 版本。这篇博文概述了自上次发布以来的显著变化。
您可以使用 conda 安装 Dask
conda install dask
或从 PyPI 使用 pip 安装
pip install dask[complete] --upgrade
完整的更新日志可在此处查看
显著变化
一如既往,变化太多无法在此一一列出,我们将重点介绍读者可能感兴趣或破坏旧行为的几个变化。我们特别讨论以下几点:
- Parquet 重写
- 为 Client 和日志提供更友好的 HTML 输出
- 在 Dask-ML 中使用 Hyperband 进行超参数选择
- 将字节 I/O 处理从 Dask 移至 FSSpec
- 随处可见的 async/await,以及为开发者提供的更清晰的设置
- 新的 SSH 部署解决方案
1 - Parquet 重写
如今,Dask DataFrame 可以使用 fastparquet 或 Apache Arrow 读取和写入 Parquet 数据。
import dask.dataframe as dd
df = dd.read_parquet("/path/to/mydata.parquet", engine="arrow")
# or
df = dd.read_parquet("/path/to/mydata.parquet", engine="fastparquet")
在 Dask 中同时支持这两个库对用户很有帮助,但也带来了一些维护负担,特别是考虑到每个库多年来都与 Dask Dataframe 共同发展。Dask Dataframe 与这些库之间的契约关系错综复杂,使得快速演进变得困难。
为了解决这个问题,我们将 Dask 对 Parquet 读取器/写入器的期望正式化为一个更规范的 Parquet 引擎契约。这降低了维护成本,使每个项目都能独立开发,并允许新的引擎出现。
在 RAPIDS cuDF 库的一个 PR 中,已经有了一个 GPU 加速的 Parquet 读取器。
因此,我们也能够修复许多长期存在的错误,并改进了两个引擎的功能。
开发期间来自 Sarah Bird 的一些有趣引言
我目前正在测试。到目前为止一切顺利。我可以在几秒钟内加载我的数据集,包含 1800 个分区。这是游戏规则的改变者!
以及
我现在成功地处理了一个包含 74,000 个分区且没有元数据的数据集。打开数据集并执行 df.head() 需要 7 - 30 秒。(大概取决于 s3fs 缓存是否冷)。这太棒了!以前这根本不可能。
API 保持不变,但功能应该更流畅。
感谢 Rick Zamora、Martin Durant 在此做了大部分工作,也感谢 Sarah Bird、Wes McKinney 和 Mike McCarty 提供的指导和评审。
2 - 为 Client 和日志提供更友好的 HTML 输出
from dask.distributed import Client
client = Client()
客户端
|
集群
|
client.cluster.logs()
调度器
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:60275
distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787
distributed.scheduler - INFO - Register tcp://127.0.0.1:60281
distributed.scheduler - INFO - Register tcp://127.0.0.1:60282
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60281
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60282
distributed.scheduler - INFO - Register tcp://127.0.0.1:60285
distributed.scheduler - INFO - Register tcp://127.0.0.1:60286
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60285
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60286
distributed.scheduler - INFO - Receive client connection: Client-6b6ba1d0-b3bd-11e9-9bd0-acde48001122
tcp://127.0.0.1:60281
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:60281
distributed.worker - INFO - Listening to: tcp://127.0.0.1:60281
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 3
distributed.worker - INFO - Memory: 4.29 GB
distributed.worker - INFO - Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-c4_44fym
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
tcp://127.0.0.1:60282
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:60282
distributed.worker - INFO - Listening to: tcp://127.0.0.1:60282
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 3
distributed.worker - INFO - Memory: 4.29 GB
distributed.worker - INFO - Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-quu4taje
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
tcp://127.0.0.1:60285
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:60285
distributed.worker - INFO - Listening to: tcp://127.0.0.1:60285
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 3
distributed.worker - INFO - Memory: 4.29 GB
distributed.worker - INFO - Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-ll4cozug
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
tcp://127.0.0.1:60286
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:60286
distributed.worker - INFO - Listening to: tcp://127.0.0.1:60286
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 3
distributed.worker - INFO - Memory: 4.29 GB
distributed.worker - INFO - Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-lpbkkzj6
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
注意:在除 IE 和 Edge 以外的任何浏览器中显示效果更好
感谢 Jacob Tomlinson 完成此项工作。
3 - 使用 HyperBand 进行超参数选择
Dask-ML 1.0 已发布,其中包含一个新的 HyperBandSearchCV
元估计器,用于超参数优化。这可以作为 RandomizedSearchCV
的替代方案,通过不浪费时间在没有前景的超参数上,从而在更短的时间内找到相似的超参数。
>>> import numpy as np
>>> from dask_ml.model_selection import HyperbandSearchCV
>>> from dask_ml.datasets import make_classification
>>> from sklearn.linear_model import SGDClassifier
>>> X, y = make_classification(chunks=20)
>>> est = SGDClassifier(tol=1e-3)
>>> param_dist = {'alpha': np.logspace(-4, 0, num=1000),
>>> 'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'],
>>> 'average': [True, False]}
>>> search = HyperbandSearchCV(est, param_dist)
>>> search.fit(X, y, classes=np.unique(y))
>>> search.best_params_
{'loss': 'log', 'average': False, 'alpha': 0.0080502}
感谢 Scott Sievert。您可以观看他的 SciPy 2019 演讲,更深入地了解这个话题。
4 - 将字节 I/O 处理从 Dask 移至 FSSpec
我们将 Dask 用于读写原始数据到不同存储系统的内部代码剥离到一个单独的项目 fsspec 中。
这里有一个小例子
import fsspec
with fsspec.open("https://github.com/dask/dask/edit/master/README.rst") as f:
print(f.read(1000))
with fsspec.open("s3://bucket/myfile.csv") as f:
df = pd.read_csv(f)
with fsspec.open("hdfs:///path/to/myfile.csv") as f:
df = pd.read_csv(f)
with fsspec.open("gcs://bucket/myfile.csv") as f:
df = pd.read_csv(f)
Dask 用于从 HDFS、S3、GCS、Azure 和其他远程存储系统读写字节数据的 I/O 基础设施可以说是当今 Python 中最统一和全面的。通过 s3fs、gcsfs 和 hdfs3 pyarrow.hdfs 等工具,可以轻松地以 Pythonic 方式读写各种远程存储系统中的数据。
早期我们就决定将这些代码独立于 Dask 主体代码库之外,这就是它们成为独立项目的原因。这个选择使得 Pandas、Zarr 等其他库能够从这项工作中受益,而无需严格依赖 Dask。然而,Dask 内部仍然有一些代码有助于将它们统一起来。我们已将这些代码移至外部项目 fsspec,其中包含 Dask 过去提供的所有集中化代码,以及一个关于远程数据系统为了兼容性应具备哪些特征的正式规范。这也帮助我们与其他项目(如 Arrow)统一工作。
特别感谢 Martin Durant 多年来引领 Dask 的 I/O 基础设施,并立即着手剥离 fsspec
的工作。
您可以在此处阅读更多关于 FSSpec 及其从 Dask 剥离的信息。
5 - 随处可见的 Async/Await,以及为开发者提供的更清晰的设置
在 Dask 2.0 中,我们放弃了对 Python 2 的支持,现在仅支持 Python 3.5 及更高版本。这使我们能够采用 async 和 await 语法进行并发执行,而不是使用 yield
的旧式协程方法。这些差异最初主要体现在语法上,但随着我们清理代码库,触发了许多实质性的改进。启动和停止内部 Scheduler、Worker、Nanny 和 Client 对象现在更加统一,减少了潜在的细微错误。
在Python API 设置文档中对此有更详细的讨论,并且在这些文档的代码示例中有所体现。
import asyncio
from dask.distributed import Scheduler, Worker, Client
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
由于这些以及其他内部清理工作,我们持续集成中的间歇性测试失败已经消失,开发者的心情也很愉快 :)
6 - 新的 SSH 集群
我们新增了第二个 SSH 集群部署解决方案。它看起来像这样
from distributed.deploy.ssh2 import SSHCluster # this will move in future releases
cluster = SSHCluster(
hosts=["host1", "host2", "host3", "host4"],
# hosts=["localhost"] * 4 # if you want to try this out locally,
worker_kwargs={"nthreads": 4},
scheduler_kwargs={},
connect_kwargs={"known_hosts": None}
)
注意:此对象是实验性的,将来可能会更改,恕不另行通知。
我们做这项工作有两个原因
-
我们的用户调查显示,惊人数量的用户通过 SSH 部署 Dask。据传闻,他们似乎只是 SSH 到机器上,然后使用 Dask 的标准命令行界面)
我们想要一个比这更容易的解决方案。
-
我们一直在努力将各种部署解决方案(如 Kubernetes、SLURM、Yarn/Hadoop)的代码统一到一个中心代码库中,而拥有一个简单的 SSHCluster 作为测试用例,对测试和实验非常有价值。
另请注意,Dask 目前已有一个更成熟的 dask-ssh 解决方案
我们预计部署方式的统一将是未来几个月开发工作的核心主题。
致谢
自上次发布博文以来,Dask 已发布了两个版本。自 6 月 30 日发布 2.0 版本以来,以下人员对以下仓库做出了贡献:
- dask/dask
- Brett Naul
- Daniel Saxton
- David Brochart
- Davis Bennett
- Elliott Sales de Andrade
- GALI PREM SAGAR
- James Bourbeau
- Jim Crist
- Loïc Estève
- Martin Durant
- Matthew Rocklin
- Matthias Bussonnier
- Natalya Rapstine
- Nick Becker
- Peter Andreas Entschev
- Ralf Gommers
- Richard (Rick) Zamora
- Sarah Bird
- Sean McKenna
- Tom Augspurger
- Willi Rath
- Xavier Holt
- andrethrill
- asmith26
- msbrown47
- tshatrov
- dask/distributed
- Christian Hudon
- Gabriel Sailer
- Jacob Tomlinson
- James Bourbeau
- Jim Crist
- Martin Durant
- Matthew Rocklin
- Pierre Glaser
- Russ Bubley
- tjb900
- dask/dask-jobqueue
- Guillaume Eynard-Bontemps
- Leo Singer
- Loïc Estève
- Matthew Rocklin
- Stuart Berg
- dask/dask-examples
- Chris White
- Ian Rose
- Matthew Rocklin
- dask/dask-mpi
- Anderson Banihirwe
- Kevin Paul
- Matthew Rocklin
- dask/dask-kubernetes
- Matthew Rocklin
- Tom Augspurger
- dask/dask-ml
- Roman Yurchak
- Tom Augspurger
- dask/dask-yarn
- Al Johri
- Jim Crist
- dask/dask-examples
博客评论由 Disqus 提供支持