特设分布式随机森林 当数组和数据帧不够灵活时
这项工作由 Continuum Analytics 和 XDATA Program 作为 Blaze 项目的一部分提供支持
此文章的截屏视频版本在此处提供:https://www.youtube.com/watch?v=FkPlEqB8AnE
太长不看(TL;DR)。
Dask.distributed 允许您向集群提交单个任务。我们利用此功能结合 Scikit Learn,在分布式表格纽约出租车数据上训练和运行分布式随机森林。
我们的机器学习模型表现不佳,但我们确实学会了如何轻松执行特设计算。
动机
在过去的几篇文章中,我们使用 Dask 集合在集群上分析了数据
通常我们的计算无法整齐地放入 bag、dataframe 或 array 的抽象中。在这种情况下,我们希望像普通代码一样灵活地使用 for 循环,同时仍然拥有集群的计算能力。通过 dask.distributed 任务接口,我们实现了接近此目标的功能。
应用:朴素分布式随机森林算法
作为一个激励性的应用,我们使用单机版 Scikit Learn 库以及 dask.distributed 快速向集群提交单个任务的能力,从头开始构建一个随机森林算法。我们的算法将如下所示
- 从某个外部源(S3)将数据拉取到集群上的多个数据帧中
- 对于每个数据帧,创建并训练一个
RandomForestClassifier
- 将单个测试数据帧分散到所有机器上
- 对于每个
RandomForestClassifier
,在测试数据帧上预测输出 - 通过多数投票聚合来自每个分类器的独立预测。为了避免将过多数据传输到任何一台机器,通过树形归约执行此多数投票。
数据:纽约出租车 2015
正如我们在关于分布式数据帧的博客文章中所述,我们使用了 2015 年所有纽约出租车出行的数据。这些数据在磁盘上约 20GB,在 RAM 中约 60GB。
我们根据其他数字列(如上车和目的地位置、票价明细、距离等)预测每辆出租车的乘客数量。
我们首先在单机上使用少量数据进行此操作,然后在集群上使用整个数据集进行此操作。我们的集群由十二个 m4.xlarge 实例组成(每个 4 个核心,15GB RAM)。
免责声明和剧透警告:我不是机器学习专家。我们的算法表现会非常差。如果您对机器学习感到兴奋,可以停止阅读此处。但是,如果您对如何使用 Dask 构建分布式算法感兴趣,那么您可能想继续阅读,特别是如果您碰巧对机器学习有足够的了解可以改进我朴素的解决方案。
API:submit, map, gather
我们使用少量 dask.distributed 函数来构建我们的计算
futures = executor.scatter(data) # scatter data
future = executor.submit(function, *args, **kwargs) # submit single task
futures = executor.map(function, sequence) # submit many tasks
results = executor.gather(futures) # gather results
executor.replicate(futures, n=number_of_replications)
特别是,像 executor.submit(function, *args)
这样的函数让我们能够每秒向集群发送数千个单独的函数。因为这些函数消耗它们自己的结果,我们可以创建完全在集群上运行的复杂工作流程,并相信分布式调度器会智能地移动数据。
从 S3 加载 Pandas
首先,我们从 Amazon S3 加载数据。我们使用 s3.read_csv(..., collection=False)
函数从 S3 上的 CSV 数据加载集群上的 178 个 Pandas DataFrames。我们获得一个 Future
对象列表,这些对象引用了这些远程数据帧。collection=False
的使用给了我们这个 Future 列表,而不是一个单一的整体 Dask.dataframe 对象。
from distributed import Executor, s3
e = Executor('52.91.1.177:8786')
dfs = s3.read_csv('dask-data/nyc-taxi/2015',
parse_dates=['tpep_pickup_datetime',
'tpep_dropoff_datetime'],
collection=False)
dfs = e.compute(dfs)
这些每一个都是一个轻量级的 Future
,指向集群上的一个 pandas.DataFrame
。
>>> dfs[:5]
[<Future: status: finished, type: DataFrame, key: finalize-a06c3dd25769f434978fa27d5a4cf24b>,
<Future: status: finished, type: DataFrame, key: finalize-7dcb27364a8701f45cb02d2fe034728a>,
<Future: status: finished, type: DataFrame, key: finalize-b0dfe075000bd59c3a90bfdf89a990da>,
<Future: status: finished, type: DataFrame, key: finalize-1c9bb25cefa1b892fac9b48c0aef7e04>,
<Future: status: finished, type: DataFrame, key: finalize-c8254256b09ae287badca3cf6d9e3142>]
如果我们愿意稍等片刻,就可以使用 .result()
方法将来自任何 future 的数据拉回到本地进程。但我们不希望这样做太多,数据传输可能很昂贵,而且我们无法将整个数据集保存在单台机器的内存中。这里我们只拉回其中一个数据帧
>>> df = dfs[0].result()
>>> df.head()
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | pickup_longitude | pickup_latitude | RateCodeID | store_and_fwd_flag | dropoff_longitude | dropoff_latitude | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2 | 2015-01-15 19:05:39 | 2015-01-15 19:23:42 | 1 | 1.59 | -73.993896 | 40.750111 | 1 | N | -73.974785 | 40.750618 | 1 | 12.0 | 1.0 | 0.5 | 3.25 | 0 | 0.3 | 17.05 |
1 | 1 | 2015-01-10 20:33:38 | 2015-01-10 20:53:28 | 1 | 3.30 | -74.001648 | 40.724243 | 1 | N | -73.994415 | 40.759109 | 1 | 14.5 | 0.5 | 0.5 | 2.00 | 0 | 0.3 | 17.80 |
2 | 1 | 2015-01-10 20:33:38 | 2015-01-10 20:43:41 | 1 | 1.80 | -73.963341 | 40.802788 | 1 | N | -73.951820 | 40.824413 | 2 | 9.5 | 0.5 | 0.5 | 0.00 | 0 | 0.3 | 10.80 |
3 | 1 | 2015-01-10 20:33:39 | 2015-01-10 20:35:31 | 1 | 0.50 | -74.009087 | 40.713818 | 1 | N | -74.004326 | 40.719986 | 2 | 3.5 | 0.5 | 0.5 | 0.00 | 0 | 0.3 | 4.80 |
4 | 1 | 2015-01-10 20:33:39 | 2015-01-10 20:52:58 | 1 | 3.00 | -73.971176 | 40.762428 | 1 | N | -74.004181 | 40.742653 | 2 | 15.0 | 0.5 | 0.5 | 0.00 | 0 | 0.3 | 16.30 |
在单机上训练
首先,让我们用这些少量数据在单机上完成标准的 Scikit Learn fit/predict/score 周期。
from sklearn.ensemble import RandomForestClassifier
from sklearn.cross_validation import train_test_split
df_train, df_test = train_test_split(df)
columns = ['trip_distance', 'pickup_longitude', 'pickup_latitude',
'dropoff_longitude', 'dropoff_latitude', 'payment_type',
'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount']
est = RandomForestClassifier(n_estimators=4)
est.fit(df_train[columns], df_train.passenger_count)
这会构建一个包含四个决策树的 RandomForestClassifer
,然后用数据中的数字列对其进行训练,尝试预测 passenger_count
列。在单核上训练大约需要 10 秒。现在我们看看在保留的测试数据上表现如何
>>> est.score(df_test[columns], df_test.passenger_count)
0.65808188654721012
这个 65% 的准确率实际上相当糟糕。纽约大约 70% 的出行只有一名乘客,所以“总是猜一”的模型会比我们花哨的随机森林表现更好。
>>> from sklearn.metrics import accuracy_score
>>> import numpy as np
>>> accuracy_score(df_test.passenger_count,
... np.ones_like(df_test.passenger_count))
0.70669390028780987
这正是我在机器学习方面的无知真正要命的地方。很可能有一种简单的方法可以改进这一点。然而,因为我更感兴趣的是展示如何使用 Dask 构建分布式计算,而不是实际进行机器学习,所以我将继续采用这种朴素的方法。剧透警告:我们将进行大量计算,但仍然无法击败“总是猜一”的策略。
使用 executor.map 在集群上进行拟合
首先,我们构建一个函数,它执行我们之前所做的工作,即构建一个随机森林然后在数据帧上进行训练。
def fit(df):
est = RandomForestClassifier(n_estimators=4)
est.fit(df[columns], df.passenger_count)
return est
其次,我们使用标准 e.map(function, sequence)
函数在集群上的所有训练数据帧上调用此函数。这会发送许多小任务供集群运行。我们使用除了最后一个数据帧之外的所有数据帧作为训练数据,并保留最后一个数据帧用于测试。有更原则性的方法可以做到这一点,但我们在这里将继续前进。
train = dfs[:-1]
test = dfs[-1]
estimators = e.map(fit, train)
在所有 177 个数据帧上训练大约需要两分钟,现在我们有 177 个独立的估计器,每个估计器都能够预测特定出行有多少乘客。这种计算的开销相对较小。
在测试数据上进行预测
回想一下,我们保留了一个单独的 future test
,它指向集群上的一个 Pandas 数据帧,该数据帧未用于训练我们的任何 177 个估计器。我们将在集群上的所有 worker 上复制此数据帧,然后要求每个估计器预测此数据集中每次出行的乘客数量。
e.replicate([test], n=48)
def predict(est, X):
return est.predict(X[columns])
predictions = [e.submit(predict, est, test) for est in estimators]
在这里,我们在列表推导式中使用了 executor.submit(function, *args, **kwrags)
函数,以单独启动许多任务。调度器确定何时何地运行这些任务,以获得最佳计算时间和最小数据传输。与所有函数一样,这将返回 futures,如果将来需要,我们可以使用它们收集数据。
开发者注意事项:我们在这里明确进行复制,以利用高效的树状广播算法。这纯粹是性能方面的考虑,没有这一步一切也能正常工作,但明确的广播将 30 秒的通信+计算变成了 2 秒的通信+计算。
通过多数投票聚合预测结果
对于每个估计器,我们现在都有一个针对我们测试数据中所有出行的乘客数量的独立预测。换句话说,对于每次出行,我们有 177 种不同的关于车内有多少乘客的意见。通过将这些意见平均起来,我们希望达到更准确的共识意见。
例如,考虑前四个预测数组
>>> a_few_predictions = e.gather(predictions[:4]) # remote futures -> local arrays
>>> a_few_predictions
[array([1, 2, 1, ..., 2, 2, 1]),
array([1, 1, 1, ..., 1, 1, 1]),
array([2, 1, 1, ..., 1, 1, 1]),
array([1, 1, 1, ..., 1, 1, 1])]
对于第一个出行/列,我们看到四个预测中有三个是关于一名乘客,而一个预测不同意,是关于两名乘客。我们通过取堆叠数组的众数来创建共识意见
from scipy.stats import mode
import numpy as np
def mymode(*arrays):
array = np.stack(arrays, axis=0)
return mode(array)[0][0]
>>> mymode(*a_few_predictions)
array([1, 1, 1, ..., 1, 1, 1])
因此,当我们对这四个预测数组求平均时,我们看到对于这里可见的所有六次出行,一人乘客的多数意见占主导地位。
树形归约
我们可以在所有预测上调用 mymode
函数,如下所示
>>> mode_prediction = e.submit(mymode, *predictions) # this doesn't scale well
不幸的是,这将把我们所有的结果都移动到单台机器上在那里计算众数。这可能会使那台单机不堪重负。
相反,我们将预测结果分批,每组 10 个,对每组求平均,然后对较小的预测结果集重复此过程,直到只剩下一个。这种多步归约称为树形归约。我们可以用几个嵌套循环和 executor.submit
来编写它。这只是众数的一个近似,但它是一个更具可伸缩性的计算。这在大约 1.5 秒内完成。
from toolz import partition_all
while len(predictions) > 1:
predictions = [e.submit(mymode, *chunk)
for chunk in partition_all(10, predictions)]
result = e.gather(predictions)[0]
>>> result
array([1, 1, 1, ..., 1, 1, 1])
最终得分
最后,在集群上完成所有工作后,我们可以看到我们的分布式随机森林算法表现如何。
>>> accuracy_score(result, test.result().passenger_count)
0.67061974451423045
仍然比朴素的“总是猜一”策略差。这恰恰说明,无论您的大数据解决方案多么复杂,都无法替代常识和一点点领域专业知识。
哪些方面做得不够好
和往常一样,我会有一个像这样的部分,坦诚地说哪些方面做得不够好,以及如果时间更充足我会怎么做。
- 显然,这会受益于更多的机器学习知识。对于这个问题,什么方法会更好?
- 我一直在思考集群上复制数据的内存管理。在这个练习中,我们特意复制了测试数据。没有这一步一切也能正常工作,但会慢得多,因为每个 worker 都需要从最初拥有测试数据帧的那个 worker 那里收集数据。复制数据很好,直到开始填满分布式 RAM。思考何时开始清理冗余数据以及何时保留它是很有趣的。
- 来自开源用户和 Continuum 客户的几个人都询问了一个通用的 Dask 机器学习库,类似于 Spark 的 MLlib。理想情况下,未来的 Dask.learn 模块将像 Dask.dataframe 利用 Pandas 一样,利用 Scikit-Learn。如何干净地分解和并行化 Scikit-Learn 算法尚不清楚。
结论
这篇博客文章提供了一个具体示例,使用基本任务提交函数 executor.map
和 executor.submit
来构建一个非平凡的计算。这种方法简单直接且没有限制。就个人而言,这种接口比像 Dask.dataframe 这样的集合更让我兴奋;任意任务提交具有很大的自由度。
链接
blog comments powered by Disqus