大规模图像分析中的 Dask 与 ITK
作者:John Kirkham, Matthew Rocklin, Matthew McCormick
执行摘要
本文探讨了如何将 ITK 图像处理工具套件与 Dask Array 并行使用。
我们将介绍….
- 一个简单但常见的例子:对一叠 3d 图像应用去卷积
- 如何让这两个库协同工作的技巧
- 我们遇到的挑战以及未来改进的机会。
一个工作示例
让我们从一个完整的例子开始,对一叠光片显微镜数据应用 Richardson Lucy 去卷积。这与我们在 上一篇关于图像加载的博文 中展示的加载数据相同。您可以在 这里通过 google drive 作为 tiff 文件访问这些数据,并在这里访问相应的点扩散函数图像。
# Load our data from last time¶
import dask.array as da
imgs = da.from_zarr("AOLLSMData_m4_raw.zarr/", "data")
|
此数据集的形状为 (3, 199, 201, 1024, 768)
- 3 个荧光颜色通道,
- 199 个时间点,
- 201 个 z 切片,
- y 维度上有 1024 像素,以及
- x 维度上有 768 像素。
# Load our Point Spread Function (PSF)
import dask.array.image
psf = dask.array.image.imread("AOLLSMData/m4/psfs_z0p1/*.tif")[:, None, ...]
|
# Convert data to float32 for computation¶
import numpy as np
imgs = imgs.astype(np.float32)
# Note: the psf needs to be sampled with a voxel spacing
# consistent with the image's sampling
psf = psf.astype(np.float32)
# Apply Richardson-Lucy Deconvolution¶
def richardson_lucy_deconvolution(img, psf, iterations=1):
""" Apply deconvolution to a single chunk of data """
import itk
img = img[0, 0, ...] # remove leading two length-one dimensions
psf = psf[0, 0, ...] # remove leading two length-one dimensions
image = itk.image_view_from_array(img) # Convert to ITK object
kernel = itk.image_view_from_array(psf) # Convert to ITK object
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image,
kernel_image=kernel,
number_of_iterations=iterations
)
result = itk.array_from_image(deconvolved) # Convert back to Numpy array
result = result[None, None, ...] # Add back the leading length-one dimensions
return result
out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
# Create a local cluster of dask worker processes
# (this could also point to a distributed cluster if you have it)
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=20, threads_per_process=1)
client = Client(cluster) # now dask operations use this cluster by default
# Trigger computation and store
out.to_zarr("AOLLSMData_m4_raw.zarr", "deconvolved", overwrite=True)
因此在上面的例子中,我们….
- 将 Zarr 和 TIFF 文件中的数据加载到多分块的 Dask 数组中
- 构建一个函数,将 ITK 例程应用于每个块
- 使用 dask.array.map_blocks 函数将该函数应用于整个 Dask 数组。
- 将结果存储回 Zarr 格式
从成像科学家的角度来看,这里新的技术是 dask.array.map_blocks 函数。给定一个由许多 NumPy 数组组成的 Dask 数组和一个函数,map_blocks
会并行地将该函数应用于每个块,并返回一个 Dask 数组作为结果。当你想要以简单的方式对许多块应用操作时,它是一个很棒的工具。因为 Dask 数组只是由 Numpy 数组构成,所以它很容易将 Dask 与科学 Python 生态系统的其余部分结合起来。
构建正确的函数
然而在这种情况下,由于 ITK 和 Dask Array 中都存在一些特性问题,构建正确的 Numpy -> Numpy 函数面临一些挑战。让我们再次看看我们的函数
def richardson_lucy_deconvolution(img, psf, iterations=1):
""" Apply deconvolution to a single chunk of data """
import itk
img = img[0, 0, ...] # remove leading two length-one dimensions
psf = psf[0, 0, ...] # remove leading two length-one dimensions
image = itk.image_view_from_array(img) # Convert to ITK object
kernel = itk.image_view_from_array(psf) # Convert to ITK object
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image,
kernel_image=kernel,
number_of_iterations=iterations
)
result = itk.array_from_image(deconvolved) # Convert back to Numpy array
result = result[None, None, ...] # Add back the leading length-one dimensions
return result
out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
这比我们想要的要长。相反,我们更倾向于直接使用 itk
函数,而无需所有前后的步骤。
deconvolved = da.map_blocks(itk.richardson_lucy_deconvolution_image_filter, imgs, psf)
我们函数中的额外步骤是什么,为什么它们是必要的?
-
转换为和从 ITK Image 对象: ITK 函数不接受和生成 Numpy 数组,它们接受和生成自己的
Image
数据结构。有方便的函数可以来回转换,所以处理起来很简单,但每次都需要处理。请参阅 ITK #1136 获取一个功能请求,该请求将消除此步骤的需要。 -
解包和打包单例维度: 我们的 Dask 数组的形状如下
Array Shape: (3, 199, 201, 1024, 768) Chunk Shape: (1, 1, 201, 1024, 768)
因此我们的
map_blocks
函数获取块大小的 NumPy 数组,即(1, 1, 201, 1024, 768)
。然而,我们的 ITK 函数旨在处理 3d 数组,而不是 5d 数组,所以我们需要移除前两个维度。img = img[0, 0, ...] # remove leading two length-one dimensions psf = psf[0, 0, ...] # remove leading two length-one dimensions
然后当我们完成后,Dask 期望像它提供的那样获取 5d 数组,所以我们将这些单例维度添加回去
result = result[None, None, ...] # Add back the leading length-one dimensions
同样,这对于习惯 NumPy 切片语法的用户来说很简单,但每次都需要完成。这给我们的开发过程增加了一些阻力,也是可能让用户感到困惑的另一个步骤。
但是如果你习惯于处理这类问题,那么如果你想在集群上并行化 ITK 操作,ITK 和 map_blocks
可以是一个强大的组合。
定义一个 Dask 集群
上面我们使用 dask.distributed.LocalCluster
在我们的本地工作站上设置了 20 个单线程工作进程
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=20, threads_per_process=1)
client = Client(cluster) # now dask operations use this cluster by default
如果您有分布式资源,这就是您连接它的地方。您可以将 LocalCluster
替换为 Dask 的其他部署选项 之一。
此外,我们发现需要使用许多单线程进程而不是一个多线程进程,因为 ITK 函数似乎仍然持有 GIL。这没关系,我们只需要意识到这一点,以便适当地设置我们的 Dask 工作进程,为每个进程分配一个线程以实现最大效率。请参阅 ITK #1134,这是一个关于此主题的活跃 Github issue。
序列化
我们在跨多个进程使用 ITK 库时遇到了一些困难,因为库本身序列化得不好。(如果您不理解这是什么意思,不用担心)。我们在 ITK #1090 中解决了一些问题,但仍然存在一些问题。
我们通过将导入语句包含在函数内部而不是外部来规避这个问题。
def richardson_lucy_deconvolution(img, psf, iterations=1):
import itk # <--- we work around serialization issues by importing within the function
这样每个任务都会单独导入 itk,我们就绕过了这个问题。
尝试 Scikit-Image
我们还尝试了 Scikit-Image 中的 Richardson Lucy 去卷积操作。Scikit-Image 以更偏向 Scipy/Numpy 原生而闻名,但并非总是像 ITK 那样快。我们的经验证实了这种看法。
首先,我们很高兴看到 scikit-image 函数可以立即与 map_blocks
一起工作,没有任何打包/解包、维度或序列化问题
import skimage.restoration
out = da.map_blocks(skimage.restoration.richardson_lucy, imgs, psf) # just works
因此,这里不需要进行所有那些转换为和从图像对象或移除和添加单例维度的操作。
在性能方面,我们也高兴地看到 Scikit-Image 释放了 GIL,因此在使用少量多线程进程时,我们能够获得非常高的报告 CPU 利用率。然而,尽管 CPU 利用率很高,我们的并行性能却足够差,以至于我们仍然坚持使用 ITK 解决方案,即使它存在一些缺点。关于此的更多信息可在 Github issue scikit-image #4083 中找到。
注意:在单个块上按顺序运行时,ITK 大约需要 2 分钟,而 scikit-image 需要 3 分钟。只有当我们开始并行化时,事情才变得缓慢。
无论如何,我们在此实验中的目标是看看 ITK 和 Dask array 协同工作的效果如何。看到这种顺畅的集成是什么样子令人欣慰,即使只是为了激励 ITK+Dask 关系在未来得到发展。
Numba GUFuncs
除了 da.map_blocks
之外,另一种选择是广义通用函数 (Generalized Universal Functions, gufuncs)。这些函数具有许多神奇的特性,其中之一是它们在 NumPy 数组和 Dask 数组上都能很好地工作。如果像 ITK 或 Scikit-Image 这样的库将其函数转换为 gufuncs,那么用户无需做任何特别的事情就能使用它们。
目前实现 gufuncs 最简单的方法是使用 Numba。我在我们封装的 richardson_lucy
函数上进行了尝试,只是为了展示它的工作原理,以防其他库将来想采用这种方式。
import numba
@numba.guvectorize(
["float32[:,:,:], float32[:,:,:], float32[:,:,:]"], # we have to specify types
"(i,j,k),(a,b,c)->(i,j,k)", # and dimensionality explicitly
forceobj=True,
)
def richardson_lucy_deconvolution(img, psf, out):
# <---- no dimension unpacking!
iterations = 1
image = itk.image_view_from_array(np.ascontiguousarray(img))
kernel = itk.image_view_from_array(np.ascontiguousarray(psf))
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image, kernel_image=kernel, number_of_iterations=iterations
)
out[:] = itk.array_from_image(deconvolved)
# Now this function works natively on either NumPy or Dask arrays
out = richardson_lucy_deconvolution(imgs, psf) # <-- no map_blocks call!
请注意,我们既没有进行维度解包,也没有调用 map_blocks
。我们的函数现在知道足够的信息,以便 Dask 无需被明确告知要做什么,就能进行并行化。
这给库维护者增加了一些负担,但使用户体验更加顺畅。
GPU 加速
在进行图像处理和 Dask 的用户调研时,我们采访的几乎每个人都表示他们想要更快的去卷积。这似乎是一个主要的痛点。现在我们知道原因了。它既非常普遍,又非常慢。
在此大小的单个块上运行去卷积大约需要 2-4 分钟,而单个数据集中我们有数百个块。多核并行化在这里可以提供一些帮助,但这个问题也可能适合 GPU 加速。类似的操作通常在 GPU 上有 100 倍的速度提升。这可能比扩展到大型分布式集群更具实用性。
下一步是什么?
这个实验….
- 提供了一个示例,其他成像科学家可以复制和修改该示例,以便有效地结合使用 Dask 和 ITK。
-
突出了改进领域,不同库的开发者可以在这些领域努力,以消除未来的一些粗糙的交互点。
值得注意的是,Dask 在 Scipy 生态系统中已经与许多库做到了这一点,包括 Pandas、Scikit-Image、Scikit-Learn 等。
我们也将继续进行我们的成像实验,同时这些技术问题将在后台得到解决。接下来是分割!
博文评论由 Disqus 提供支持