从 Dask 中提取 fsspec
作者:Martin Durant
TL;DR
fsspec
作为 Dask、Intake、s3fs、gcsfs 等中文件系统操作的新基础,现已作为独立接口和开发新后端及文件操作的中心位置提供。虽然它作为 Dask 的一部分开发,但您不再需要 Dask 来使用此功能。
引言
过去几年里,Dask 的 IO 能力逐步有机地发展,涵盖了多种文件格式,并能够无缝访问各种远程/云数据系统上的数据。这通过多个将云资源视为文件系统的配套包以及 dask.bytes
中的专用代码实现。其中一些存储后端,特别是 s3fs
,在 Dask 之外也立即变得有用,并被 pandas
、xarray
和其他包作为可选依赖项采用。
为了整合各种后端的行为,为任何新后端提供单一参考规范,并即使没有 Dask 也提供这套文件系统操作,我创建了 fsspec
。上周,Dask 改为直接使用 fsspec
来满足其 IO 需求,我想在此详细描述这一变化带来的好处。
虽然这最初是为了减轻维护负担而做的,但重要的意义在于我们希望让文件系统操作能够轻松地供整个 pydata 生态系统使用,无论是否使用 Dask。
历史
我编写的第一个文件系统是 hdfs3
,它是 libhdfs3
C 库的一个薄封装。当时,Dask 已经具备了在分布式集群上运行的能力,而 HDFS 是这些场景中最流行的存储解决方案(至少在商业领域),因此需要一个解决方案。Python API 与 C API 紧密匹配,后者又遵循了 Java API 和 posix 标准。幸运的是,Python 已经有了 类文件标准,所以提供实现该标准的对象就足以使远程字节可供许多包使用。
很快,云资源的重要性至少与集群内部文件系统一样大,因此紧随其后的是 s3fs、adlfs 和 gcsfs。每个都遵循相同的模式,但针对特定接口编写了一些特定代码,并根据先前接口的经验进行了改进。在此期间,Dask 的需求也因 parquet 等更复杂的文件格式而演变。用于连接不同后端并调整其方法的代码最终进入了 Dask 仓库。
与此同时,其他文件系统接口也出现了,特别是 pyarrow
的,它有自己的 HDFS 实现和直接的 parquet 读取功能。但我们希望生态系统中的所有工具都能很好地协同工作,以便 Dask 可以使用任何一种引擎从任何存储后端读取 parquet。
代码重复
复制一个接口,对其进行调整并发布,就像我对文件系统的每次迭代所做的那样,这无疑是快速完成工作的方法。然而,当您想要更改行为或添加新功能时,结果是您需要在每个地方重复工作(违反了 DRY 原则)或者让接口慢慢分化。很好的例子是 glob
和 walk
,前者支持各种选项,而后者返回不同的东西(列表、版本目录/文件迭代器)。
>>> fs = dask.bytes.local.LocalFileSystem()
>>> fs.walk('/home/path/')
<iterator of tuples>
>>> fs = s3fs.S3FileSystme()
>>> fs.walk('bucket/path')
[list of filenames]
我们发现,为了满足 Dask 的需求,我们需要构建小型包装类,以确保所有后端具有兼容的 API,以及一个用于以相同接口操作本地文件系统的类,最后还有一个包含各种辅助函数的注册表来管理所有这些。这其中很少有 Dask 特有的内容,只有少数几个函数涉及图构建和延迟执行。然而,它确实提出了一个重要问题:文件系统应该是可序列化的,并且应该有一种指定要打开的文件的方式,这种方式也应该是可序列化的(理想情况下还支持透明的文本和压缩)。
新文件系统
我之前已经提到了努力创建一个本地文件系统类,使其与已存在的其他文件系统具有相同的接口。但是 Dask 用户(以及其他人)可能需要更多选项,例如 ssh、ftp、http、内存文件系统等等。根据用户的请求处理这些选项,我们开始编写更多的文件系统接口,所有这些都位于 dask.bytes
内部;但当时不清楚它们是只应支持非常基本的功能,仅够 Dask 使用,还是应支持完整的文件操作集。
特别是在内存文件系统方面,存在一个极其长期的 PR 中——当每个 worker 都有自己的内存,因此看到的是“文件系统”的不同状态时,尚不清楚这样的东西对 Dask 有多大用处。
整合
file system Spec(后来称为 fsspec
)的诞生源于希望规范化和整合存储后端的行为、减少重复,并为所有后端提供相同的功能。在此过程中,编写新的实现类变得容易得多:参见 实现,其中包括有趣且高度实验性的选项,例如 CachingFileSystem
,它会为每次远程读取创建本地副本,以便第二次访问更快。然而,更重要的主流实现也逐渐成形,例如 FTP、SSH、内存文件系统和 webHDFS(后者是在解决了使用 hdfs3
构建和认证的所有问题之后,从集群外部访问 HDFS 的最佳选择)。
此外,新的仓库提供了实现新功能的机会,这些新功能将比仅在选定仓库中实现具有更广泛的适用性。示例包括 FUSE 挂载、文件系统上的字典式键值视图(例如 zarr 所使用的),以及文件的事务性写入。所有文件系统都是可序列化的,并兼容 pyarrow。
用途
最终我意识到文件系统类提供的操作对于不使用 Dask 的人也非常有用。实际上,例如 s3fs
就有很多独立使用或与 fastparquet 等能够接受文件系统函数的库或 pandas 结合使用的场景。
因此,创建一个特定的仓库来编写 Dask 兼容的文件系统应遵守的规范似乎是合理的,我发现我可以从现有实现中提取出许多通用行为,将只存在于部分实现中的功能提供给所有实现,并在整个过程中普遍改进每一个实现。
然而,当将 fsspec
与 Intake 结合考虑时,我才意识到一个独立的文件系统包的通用性有多大:该 PR 实现了一个通用的文件选择器,可以浏览我们可用的任何文件系统中的文件,甚至能够将 S3 上的远程 zip 文件视为可浏览的文件系统。请注意,与本博客的总体思路类似,文件选择器本身不必存在于 Intake 仓库中,最终会成为一个独立的项目,或成为 fsspec
的一个可选功能。您不应该仅仅为了获得通用的文件系统操作而需要 Intake。
总结
这项工作尚未达到“协议标准”的水平,例如众所周知的 python 缓冲区协议,但我认为这是让各种存储服务中的数据可供人们使用的有用一步,因为您可以使用相同的 API 操作每个服务,预期相同的行为,并创建真实的 Python 类文件对象传递给其他函数。拥有这样一个单一的中央仓库提供了一个显而易见的地方来讨论和修改规范,并在此基础上构建额外功能。
许多改进仍待完成,例如在更多函数中支持 glob 字符串,或者创建一个可以根据提供的 URL 形式分派到不同后端的单一文件系统;但现在所有这些都有了一个显而易见的地方可以进行。
博客评论由 Disqus 提供