两则利用 Scikit Learn 与 Dask 的简单方法
这项工作由 Continuum Analytics、XDATA Program 以及来自 Moore Foundation 的数据驱动发现倡议支持。
摘要
本文介绍了两种简单的方法,利用 Dask 并行化 Scikit-Learn 操作,无论是在单台计算机上还是在集群中。
- 使用 Dask Joblib 后端
- 使用
dklearn
项目提供的Pipeline
、GridSearchCV
和RandomSearchCV
的即插即用替代品。
对于不耐烦的读者,它们看起来像这样:
### Joblib
from joblib import parallel_backend
with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
# your now-cluster-ified sklearn code here
### Dask-learn pipeline and GridSearchCV drop-in replacements
# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
# from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline
然而,这两种技术都不完美。它们是最容易尝试的方法,但并非总是最佳解决方案。这篇博文侧重于易于实现的成果。
Joblib
Scikit-Learn 已经通过 Joblib 在多核 CPU 上实现了并行化。Joblib 是一个简单、强大且成熟的库,提供了可扩展的 map 操作。这是一个在不使用 sklearn 的情况下单独使用 Joblib 的简单示例:
# Sequential code
from time import sleep
def slowinc(x):
sleep(1) # take a bit of time to simulate real work
return x + 1
>>> [slowinc(i) for i in range(10)] # this takes 10 seconds
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Parallel code
from joblib import Parallel, delayed
>>> Parallel(n_jobs=4)(delayed(slowinc)(i) for i in range(10)) # this takes 3 seconds
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Dask 用户会认出 delayed
函数修饰符。Dask 从 Joblib “偷”来了 delayed
装饰器。
Scikit-learn 的许多并行算法在内部使用 Joblib。如果我们将 Joblib 扩展到集群,那么就可以立即从启用 Joblib 的 Scikit-learn 函数中获得额外的并行性。
分布式 Joblib
幸运的是,Joblib 提供了一个接口,允许其他并行系统介入并充当执行引擎。我们可以使用 parallel_backend
上下文管理器来实现这一点,以便在附近的集群中使用数百或数千个核心运行。
import distributed.joblib
from joblib import parallel_backend
with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
print(Parallel()(delayed(slowinc)(i) for i in list(range(100))))
对于 Scikit-learn 用户来说,这里的核心价值在于 Scikit-learn 代码中已经使用了 joblib.Parallel
,因此这个技巧适用于你已有的 Scikit-learn 代码。
因此,我们可以使用 Joblib 在我们的多核处理器上正常并行化:
estimator = GridSearchCV(n_jobs=4, ...) # use joblib on local multi-core processor
或者,我们可以将 Joblib 与 Dask.distributed 结合使用,在多节点集群上进行并行化:
with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
estimator = GridSearchCV(...) # use joblib with Dask cluster
(文末将有一个更详细的示例)
局限性
Joblib 在 Scikit-learn 的许多算法中都有使用,但并非全部。通常,接受 n_jobs=
参数的任何操作都可能适用。
从 Dask 的角度来看,Joblib 的接口并不理想。例如,它总是将中间结果收集回主进程,而不是将它们留在集群上直到必要时才取回。对于计算密集型操作来说这还可以接受,但确实增加了一些不必要的通信开销。此外,Joblib 不支持比并行 map 更复杂的操作,因此可以并行化的算法范围受到一定限制。
尽管如此,考虑到 Joblib 加速的工作流(尤其是在 Scikit-learn 中)的广泛使用,如果你附近有一个集群,这是一个值得尝试的简单方法,可能带来丰厚的回报。
Dask-learn Pipeline 和 Gridsearch
2016 年 7 月,Jim Crist 构建并撰文介绍了一个小型项目,dask-learn。该项目与 SKLearn 开发者合作,旨在探索 Scikit-learn 中哪些部分可以轻易有效地并行化。这项工作迄今为止最富有成效的成果是 Scikit-learn 的 Pipeline、GridSearchCV 和 RandomSearchCV 对象的 Dask 变体,它们能更好地处理嵌套并行。通过使用这些即插即用替代品,Jim 观察到相较于原 SKLearn 代码,速度有了显著提升。
因此,如果你替换以下导入,你可能会获得更好的单线程性能*以及*扩展到集群的能力:
# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
# from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline
以下是来自 Jim 更深入的博文中的一个简单示例:
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000,
n_features=500,
n_classes=2,
n_redundant=250,
random_state=42)
from sklearn import linear_model, decomposition
from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline
logistic = linear_model.LogisticRegression()
pca = decomposition.PCA()
pipe = Pipeline(steps=[('pca', pca),
('logistic', logistic)])
#Parameters of pipelines can be set using ‘__’ separated parameter names:
grid = dict(pca__n_components=[50, 100, 150, 250],
logistic__C=[1e-4, 1.0, 10, 1e4],
logistic__penalty=['l1', 'l2'])
# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
estimator = GridSearchCV(pipe, grid)
estimator.fit(X, y)
SKLearn 执行此计算大约需要 40 秒,而 dask-learn 的即插即用替代品大约需要 10 秒。此外,如果你添加以下几行连接到正在运行的集群,整个过程就会扩展开来:
from dask.distributed import Client
c = Client('scheduler-address:8786')
这是我在自己笔记本电脑上运行的一个微型八进程“集群”上计算的实时 Bokeh 图。我在此处使用进程是为了突出进程间通信(红色)的开销。实际上,在同一个单进程中运行此计算大约快 30%。
结论
本文介绍了 scikit-learn 用户利用 Dask 加速其现有工作流的几种简单机制。这些方法并非特别复杂,性能也不是最优的,但它们易于理解和尝试。在未来的博文中,我计划介绍 Dask 加速复杂机器学习工作流的更高级方法。
我们可以做得更好的地方
和往常一样,我简要介绍了哪里出了问题,或者如果有更多时间我们可以做得更好的地方。
- 关于 dask-learn 的 pipeline 和 gridsearch“我们可以做得更好的地方”,请参阅 Jim 博文的底部,那里有更详细的解释。
- Joblib + Dask.distributed 的交互很方便,但在性能上有所损失。Dask 如何在不过于侵入性的情况下帮助 sklearn 代码库尚不明确。
- 对于这篇博文来说,如果在并行硬件上搭建一个实际的集群就更好了。我写得很快(几个小时),所以决定跳过这一步。如果有人愿意进行后续实验并撰写文章,我很乐意发布。
博客评论由 Disqus 提供支持