这项工作得到了 Continuum AnalyticsXDATA 项目Moore Foundation 数据驱动发现倡议的支持

摘要

本文简要介绍了 Dask 和 TensorFlow 之间潜在的交互,然后通过一个具体的示例,展示了如何将它们结合起来,用于具有中等复杂架构的分布式训练。

本文仓促写成,附带的实验质量不高,详见下文的免责声明。末尾的评论中包含了一个类似的、质量好得多的 XGBoost 示例。

引言

Dask 和 TensorFlow 都提供了 Python 中的分布式计算能力。TensorFlow 擅长深度学习应用,而 Dask 更通用。我们可以将两者结合用于一些应用

  1. 简单数据并行:在训练过程中进行超参数搜索以及对大型数据集预测已训练好的模型,使用 Dask 分布这些任务都非常简单,就像使用任何分布式计算系统(Hadoop/Spark/Flink/等)一样简单。我们不会过多讨论这个主题。它应该很直接。
  2. 部署: TensorFlow 一个常见的痛点是其设置自动化程度不高。这困扰着所有分布式系统,尤其是在各种集群管理器上运行的系统(更多信息请参见集群部署博文)。幸运的是,如果你已经有一个 Dask 集群在运行,那么在其之上、在同一进程中启动一个分布式 TensorFlow 网络就非常简单了。
  3. 预处理:我们使用 dask.dataframe 或 dask.array 进行数据预处理,然后将数据交给 TensorFlow 进行训练。如果 Dask 和 TensorFlow 位于同一进程中,那么这种数据传输将非常高效。通过协同工作,我们可以构建高效且通用的深度学习流水线。

在这篇博文中,我们非常简要地探讨了第一种情况:简单并行。然后更深入地介绍一个在更复杂场景下使用 Dask 和 TensorFlow 的实验。我们将发现,由于 TensorFlow 的设置非常合理以及 Dask 在高级场景下的灵活性,我们可以轻松实现一个相当复杂的 workflow。

动机与免责声明

分布式深度学习正在从根本上改变人类解决一些非常困难的计算问题的方式,例如自然语言翻译、语音转文本、图像识别等。然而,分布式深度学习也受到公众热情的影响,这可能会扭曲其效用的形象。分布式深度学习并非总是解决大多数问题的正确选择。原因有二:

  1. 专注于单机计算通常能更好地利用时间。模型设计、GPU 硬件等因素的影响可能比扩展规模更大。对于深度学习的新手来说,观看在线视频讲座系列可能比阅读这篇博文更能有效地利用时间。
  2. 如果你拥有有限的数据,逻辑回归和梯度提升树等传统机器学习技术可能比深度学习更有效。它们有时也能提供有价值的可解释性结果。

无论如何,即使分布式深度学习与你的应用无关,也有一些具体的收获:

  1. TensorFlow 可以通过 Python 直接设置
  2. Dask 开箱即用,具有足够的灵活性,可以支持复杂的设置和 workflows
  3. 我们将看到一个典型的分布式学习方法的例子,它超越了深度学习。

此外,作者不声称自己在深度学习方面拥有专业知识,这篇博文也是仓促写就的。

简单并行

大多数并行计算都很简单。我们可以轻松地将一个函数应用于大量数据,可能略有变动。在深度学习的场景中,这可以实现几种常见 workflows

  1. 构建许多不同的模型,在相同数据上分别进行训练,然后选择性能最好的那个。使用 dask 的 concurrent.futures 接口,这看起来像下面这样:

    # Hyperparameter search
    client = Client('dask-scheduler-address:8786')
    scores = client.map(train_and_evaluate, hyper_param_list, data=data)
    best = client.submit(max, scores)
    best.result()
    
  2. 给定一个已经训练好的模型,使用它来预测大量数据的结果。这里我们使用一个大数据集合,例如 dask.dataframe

    # Distributed prediction
    
    df = dd.read_parquet('...')
    ... # do some preprocessing here
    df['outcome'] = df.map_partitions(predict)
    

如果你对 Dask 和 TensorFlow(或任何其他机器学习库如 scikit-learn)有适度的了解,这些技术相对简单,所以我现在将忽略它们,专注于更复杂的情况。

感兴趣的读者可能会发现这篇关于TensorFlow 和 Spark 的博文很有价值。这是一篇很好的文章,更详细地介绍了这两种技术。

一个分布式 TensorFlow 应用

我们将复制这个 TensorFlow 示例,它使用多台机器训练一个内存可容纳的模型,并使用参数服务器进行协调。我们的 TensorFlow 网络将有三种不同类型的服务器:

distributed TensorFlow training graph

  1. Worker(工作节点):它们将获取更新的参数,消费训练数据,并使用这些数据生成更新发送回参数服务器
  2. Parameter Server(参数服务器):它们将保存模型参数,并根据需要与 Worker 同步
  3. Scorer(评分节点):它将定期使用验证/测试数据评估当前参数,并发出当前的 cross_entropy 分数,以查看系统运行状况。

当模型可以适应一台机器时,这是一种相当典型的方法;但当我们需要使用多台机器加速训练或因为数据量过大时,也会采用此方法。

我们将使用 TensorFlow 来完成所有实际的训练和评分工作。我们将使用 Dask 来完成其他所有事情。具体来说,我们将做以下事情:

  1. 使用 dask.array 准备数据
  2. 将 TensorFlow worker 设置为长期运行的任务
  3. 在分数仍然不佳时,将数据从 Dask 馈送给 TensorFlow
  4. 让 TensorFlow 使用其自身网络处理训练

使用 Dask.array 准备数据

对于这个玩具示例,我们将只使用 TensorFlow 附带的 mnist 数据。但是,我们将通过在集群中多次连接这些数据来人为地增大它

def get_mnist():
    from tensorflow.examples.tutorials.mnist import input_data
    mnist = input_data.read_data_sets('/tmp/mnist-data', one_hot=True)
    return mnist.train.images, mnist.train.labels

import dask.array as da
from dask import delayed

datasets = [delayed(get_mnist)() for i in range(20)]  # 20 versions of same dataset
images = [d[0] for d in datasets]
labels = [d[1] for d in datasets]

images = [da.from_delayed(im, shape=(55000, 784), dtype='float32') for im in images]
labels = [da.from_delayed(la, shape=(55000, 10), dtype='float32') for la in labels]

images = da.concatenate(images, axis=0)
labels = da.concatenate(labels, axis=0)

>>> images
dask.array<concate..., shape=(1100000, 784), dtype=float32, chunksize=(55000, 784)>

images, labels = c.persist([images, labels])  # persist data in memory

这为我们提供了一个中等大小的分布式数组,包含大约一百万张小图像。如果需要,我们可以使用普通的 dask.array 构造来检查或清理这些数据

im = images[1].compute().reshape((28, 28))
plt.imshow(im, cmap='gray')

mnist number 3

im = images.mean(axis=0).compute().reshape((28, 28))
plt.imshow(im, cmap='gray')

mnist mean

im = images.var(axis=0).compute().reshape((28, 28))
plt.imshow(im, cmap='gray')

mnist var

这展示了如何使用 Dask 集合在并行处理数据,进行清理、预处理和特征生成后,再将其发送给 TensorFlow。在我们的简单示例中,我们实际上没有做这些,但在更实际的场景中这非常有用。

最后,在对所有数据的分布式数组进行预处理后,我们将把图像和标签收集在一起,并将其分批成更小的块。当情况变得复杂时,我们再次使用一些 dask.array 构造和dask.delayed

images = images.rechunk((10000, 784))
labels = labels.rechunk((10000, 10))

images = images.to_delayed().flatten().tolist()
labels = labels.to_delayed().flatten().tolist()
batches = [delayed([im, la]) for im, la in zip(images, labels)]

batches = c.compute(batches)

现在我们有几百对 NumPy 数组存储在分布式内存中,等待发送给 TensorFlow worker。

在 Dask worker 旁设置 TensorFlow worker

Dask worker 只是普通的 Python 进程。TensorFlow 可以从一个普通的 Python 进程启动。我们在这里创建了一个小函数,它利用 Dask 运行长期任务和维护用户定义状态的能力,在 Dask worker 旁启动 TensorFlow 服务器。总共约 80 行代码(包括注释和文档字符串),它允许我们在 Dask 之上定义我们的 TensorFlow 网络,如下所示:

pip install git+https://github.com/mrocklin/dask-tensorflow
from dask.distibuted import Client  # we already had this above
client = Client('dask-scheduler-address:8786')

from dask_tensorflow import start_tensorflow
tf_spec, dask_spec = start_tensorflow(client, ps=1, worker=4, scorer=1)

>>> tf_spec.as_dict()
{'ps': ['192.168.100.1:2227'],
 'scorer': ['192.168.100.2:2222'],
 'worker': ['192.168.100.3:2223',
            '192.168.100.4:2224',
            '192.168.100.5:2225',
            '192.168.100.6:2226']}

>>> dask_spec
{'ps': ['tcp://192.168.100.1:34471'],
 'scorer': ['tcp://192.168.100.2:40623'],
 'worker': ['tcp://192.168.100.3:33075',
            'tcp://192.168.100.4:37123',
            'tcp://192.168.100.5:32839',
            'tcp://192.168.100.6:36822']}

这在 Dask worker 进程中启动了三组 TensorFlow 服务器。TensorFlow 将管理自己的通信,但与 Dask 在同一机器和同一共享内存空间中并存(请注意,在上述规格中,IP 地址匹配,但端口不同)。

这还设置了一个普通的 Python 队列,Dask 可以通过它安全地向 TensorFlow 发送信息。我们将通过这种方式在这两个服务之间发送训练数据批次。

定义 TensorFlow 模型并分配角色

现在是博文作者专业知识不足的部分了。我将直接复制、粘贴并修改 TensorFlow 文档中的一个现成示例。这是一个解决此问题的简单模型,而且我完全有可能犯转录错误。但这仍然应该能说明问题。你可以安全地忽略大部分代码。Dask 的内容在底部会再次变得有趣

import math
import tempfile
import time
from queue import Empty

IMAGE_PIXELS = 28
hidden_units = 100
learning_rate = 0.01
sync_replicas = False
replicas_to_aggregate = len(dask_spec['worker'])

def model(server):
    worker_device = "/job:%s/task:%d" % (server.server_def.job_name,
                                         server.server_def.task_index)
    task_index = server.server_def.task_index
    is_chief = task_index == 0

    with tf.device(tf.train.replica_device_setter(
                      worker_device=worker_device,
                      ps_device="/job:ps/cpu:0",
                      cluster=tf_spec)):

        global_step = tf.Variable(0, name="global_step", trainable=False)

        # Variables of the hidden layer
        hid_w = tf.Variable(
            tf.truncated_normal(
                [IMAGE_PIXELS * IMAGE_PIXELS, hidden_units],
                stddev=1.0 / IMAGE_PIXELS),
            name="hid_w")
        hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")

        # Variables of the softmax layer
        sm_w = tf.Variable(
            tf.truncated_normal(
                [hidden_units, 10],
                stddev=1.0 / math.sqrt(hidden_units)),
            name="sm_w")
        sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

        # Ops: located on the worker specified with task_index
        x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
        y_ = tf.placeholder(tf.float32, [None, 10])

        hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
        hid = tf.nn.relu(hid_lin)

        y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
        cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))

        opt = tf.train.AdamOptimizer(learning_rate)

        if sync_replicas:
            if replicas_to_aggregate is None:
                replicas_to_aggregate = num_workers
            else:
                replicas_to_aggregate = replicas_to_aggregate

            opt = tf.train.SyncReplicasOptimizer(
                      opt,
                      replicas_to_aggregate=replicas_to_aggregate,
                      total_num_replicas=num_workers,
                      name="mnist_sync_replicas")

        train_step = opt.minimize(cross_entropy, global_step=global_step)

        if sync_replicas:
            local_init_op = opt.local_step_init_op
            if is_chief:
                local_init_op = opt.chief_init_op

            ready_for_local_init_op = opt.ready_for_local_init_op

            # Initial token and chief queue runners required by the sync_replicas mode
            chief_queue_runner = opt.get_chief_queue_runner()
            sync_init_op = opt.get_init_tokens_op()

        init_op = tf.global_variables_initializer()
        train_dir = tempfile.mkdtemp()

        if sync_replicas:
          sv = tf.train.Supervisor(
              is_chief=is_chief,
              logdir=train_dir,
              init_op=init_op,
              local_init_op=local_init_op,
              ready_for_local_init_op=ready_for_local_init_op,
              recovery_wait_secs=1,
              global_step=global_step)
        else:
          sv = tf.train.Supervisor(
              is_chief=is_chief,
              logdir=train_dir,
              init_op=init_op,
              recovery_wait_secs=1,
              global_step=global_step)

        sess_config = tf.ConfigProto(
            allow_soft_placement=True,
            log_device_placement=False,
            device_filters=["/job:ps", "/job:worker/task:%d" % task_index])

        # The chief worker (task_index==0) session will prepare the session,
        # while the remaining workers will wait for the preparation to complete.
        if is_chief:
          print("Worker %d: Initializing session..." % task_index)
        else:
          print("Worker %d: Waiting for session to be initialized..." %
                task_index)

        sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)

        if sync_replicas and is_chief:
          # Chief worker will start the chief queue runner and call the init op.
          sess.run(sync_init_op)
          sv.start_queue_runners(sess, [chief_queue_runner])

        return sess, x, y_, train_step, global_step, cross_entropy


def ps_task():
    with local_client() as c:
        c.worker.tensorflow_server.join()


def scoring_task():
    with local_client() as c:
        # Scores Channel
        scores = c.channel('scores', maxlen=10)

        # Make Model
        server = c.worker.tensorflow_server
        sess, _, _, _, _, cross_entropy = model(c.worker.tensorflow_server)

        # Testing Data
        from tensorflow.examples.tutorials.mnist import input_data
        mnist = input_data.read_data_sets('/tmp/mnist-data', one_hot=True)
        test_data = {x: mnist.validation.images,
                     y_: mnist.validation.labels}

        # Main Loop
        while True:
            score = sess.run(cross_entropy, feed_dict=test_data)
            scores.append(float(score))

            time.sleep(1)


def worker_task():
    with local_client() as c:
        scores = c.channel('scores')
        num_workers = replicas_to_aggregate = len(dask_spec['worker'])

        server = c.worker.tensorflow_server
        queue = c.worker.tensorflow_queue

        # Make model
        sess, x, y_, train_step, global_step, _= model(c.worker.tensorflow_server)

        # Main loop
        while not scores or scores.data[-1] > 1000:
            try:
                batch = queue.get(timeout=0.5)
            except Empty:
                continue

            train_data = {x: batch[0],
                          y_: batch[1]}

            sess.run([train_step, global_step], feed_dict=train_data)

这里定义的最后三个函数:ps_taskscorer_taskworker_task,是我们在三组 TensorFlow 服务器类型上分别运行的函数。参数服务器任务只是启动一个长期运行的任务,并被动地加入 TensorFlow 网络

def ps_task():
    with local_client() as c:
        c.worker.tensorflow_server.join()

评分节点任务打开一个名为“scores”的worker 间通道进行通信,创建 TensorFlow 模型,然后每秒用验证数据评估模型的当前状态。它通过 worker 间通道报告得分。

def scoring_task():
    with local_client() as c:
        scores = c.channel('scores')  #  inter-worker channel

        # Make Model
        sess, _, _, _, _, cross_entropy = model(c.worker.tensorflow_server)

        ...

        while True:
            score = sess.run(cross_entropy, feed_dict=test_data)
            scores.append(float(score))
            time.sleep(1)

worker 任务创建模型,监听 Dask-TensorFlow 队列以获取新的训练数据,并持续训练直到上次报告的分数足够好。

def worker_task():
    with local_client() as c:
        scores = c.channel('scores')

        queue = c.worker.tensorflow_queue

        # Make model
        sess, x, y_, train_step, global_step, _ = model(c.worker.tensorflow_server)

        while scores.data[-1] > 1000:
            batch = queue.get()

            train_data = {x: batch[0],
                          y_: batch[1]}

            sess.run([train_step, global_step], feed_dict=train_data)

我们在具有相应 TensorFlow 服务器的 Dask worker 上启动这些任务(参见上面的 tf_specdask_spec

ps_tasks = [c.submit(ps_task, workers=worker)
            for worker in dask_spec['ps']]

worker_tasks = [c.submit(worker_task, workers=addr, pure=False)
                for addr in dask_spec['worker']]

scorer_task = c.submit(scoring_task, workers=dask_spec['scorer'][0])

这将启动长期运行的任务,它们就在那里静候外部刺激

long running TensorFlow tasks

最后,我们构建一个函数,将 Dask.array(本文开头)中的每一批数据转储到 worker 上的 Dask-TensorFlow 队列中。我们确保只在 Dask worker 具有相应的 TensorFlow 训练 worker 的地方运行这些任务

from distributed.worker_client import get_worker

def transfer_dask_to_tensorflow(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)

dump = c.map(transfer_dask_to_tensorflow, batches,
             workers=dask_spec['worker'], pure=False)

如果需要,我们可以通过订阅相同的 worker 间通道来在本地会话中跟踪进度

scores = c.channel('scores')

我们可以反复使用此方法将数据转储到 worker 中,直到它们收敛。

while scores.data[-1] > 1000:
    dump = c.map(transfer_dask_to_tensorflow, batches,
                 workers=dask_spec['worker'], pure=False)
    wait(dump)

结论

我们讨论了一种非平凡的方式来使用 TensorFlow 实现分布式机器学习。我们通过以下几种方式使用 Dask 来支持 TensorFlow:

  1. 轻松设置 TensorFlow 网络
  2. 准备和清理数据
  3. 协调进度和停止条件

我们发现 Dask 和 TensorFlow 可以很好地协同工作非常方便。Dask 在支持 TensorFlow 的同时并没有成为阻碍。这两个库都能在 Python 和更广泛的 PyData 生态(NumPy/Pandas)中良好运行,这使得它们之间的数据传输非常简单,无需昂贵或复杂的技巧。

此外,我们无需特意去集成这两个系统。无需单独进行合作来在核心层面集成 Dask 和 TensorFlow。相反,它们的设计方式使得这种类型的交互无需特别关注或努力即可实现。

这也是我写的第一篇从 Dask 角度使用了更复杂特性的博文,例如长期运行任务或使用通道在 worker 之间发布状态。这些更高级的特性在创建更复杂/定制化的并行计算系统时非常宝贵,这类系统常在公司内部见到。

我们可以做得更好的地方

从深度学习的角度来看,这个例子既基础又不完整。如果能在一个比 MNIST 更大、更复杂的数据集上进行训练就好了。此外,如果能看到训练随时间的变化以及使用不同数量 worker 的性能影响也会很好。为这篇博文辩护,我只能说 Dask 不应该影响任何这些扩展结果,因为在这些阶段 TensorFlow 完全掌握控制权,并且 TensorFlow 已经有大量的关于扩展的公开信息。

总的来说,这个实验是在一个周末下午完成的,博文在之后几个小时内写就。如果有人有兴趣进行并发表一篇关于使用 TensorFlow 和 Dask 的更认真的分布式深度学习实验,我很乐意在 Dask 方面提供支持。我认为这里有很多关于最佳实践的知识可以学习。

致谢

以下个人为这篇博文的撰写做出了贡献:

  • Stephan Hoyer 贡献了关于 TensorFlow 在实践中如何使用的讨论以及部署方面的具体经验。
  • Will WarnerErik Welch 都提供了宝贵的编辑和语言建议

博客评论由 Disqus 提供支持