深入探讨使用 from_map 创建 Dask DataFrame 集合
作者:Rick Zamora
Dask DataFrame 为几种流行的表格数据格式(如 CSV 和 Parquet)提供了专门的 IO 函数。如果您使用的是受支持的格式,那么相应的函数(例如 read_csv
)很可能是创建新的 Dask DataFrame 集合的最可靠方法。对于其他工作流程,from_map
现在提供了一种方便的方法,可以将 DataFrame 集合定义为任意函数映射。虽然这些类型的工作流程历来要求用户采用 Dask Delayed API,但 from_map
现在使自定义集合创建变得更容易且性能更高。
什么是 from_map
?
from_map
API 已添加到 Dask DataFrame 的 v2022.05.1 版本中,旨在取代 from_delayed
成为自定义 DataFrame 创建的推荐方法。其核心是,from_map
简单地使用一个通用函数(func
)将可迭代对象(inputs
)的每个元素转换为一个独立的 Dask DataFrame 分区。
dd.from_map(func: Callable, iterable: Iterable) -> dd.DataFrame
整体行为本质上是标准 Python 的 map
函数在 Dask DataFrame 中的对应物
map(func: Callable, iterable: Iterable) -> Iterator
请注意,from_map
和 map
都实际上支持任意数量的可迭代输入。但是,本文中我们只关注单个可迭代参数的使用。
一个简单的例子
为了更好地理解 from_map
的行为,让我们考虑一个简单的例子:我们想与使用以下 Pandas 代码创建的 Feather 格式数据进行交互
import pandas as pd
size = 3
paths = ["./data.0.feather", "./data.1.feather"]
for i, path in enumerate(paths):
index = range(i * size, i * size + size)
a = [i] * size
b = list("xyz")
df = pd.DataFrame({"A": a, "B": b, "index": index})
df.to_feather(path)
由于 Dask 尚未提供专门的 read_feather
函数(截至 dask-2023.3.1
),大多数用户可能会认为创建 Dask DataFrame 集合的唯一选项是使用 dask.delayed
。然而,在这种情况下创建集合的“最佳实践”是将 pd.read_feather
或 cudf.read_feather
封装在 from_map
调用中,如下所示
>>> import dask.dataframe as dd
>>> ddf = dd.from_map(pd.read_feather, paths)
>>> ddf
Dask DataFrame Structure:
A B index
npartitions=2
int64 object int64
... ... ...
... ... ...
Dask Name: read_feather, 1 graph layer
计算后产生以下 Pandas(或 cuDF)对象
>>> ddf.compute()
A B index
0 0 x 0
1 0 y 1
2 0 z 2
0 1 x 3
1 1 y 4
2 1 z 5
尽管使用传统的 dd.from_delayed
策略也可以实现相同的输出,但使用 from_map
将改善 Dask 内部分布式任务图优化的机会。
性能考量:指定 meta
和 divisions
虽然 func
和 iterable
是 from_map
唯一必需的参数,但通过指定可选参数如 meta
和 divisions
,可以显著提高工作流程的整体性能。
由于 Dask DataFrame 的惰性特性,每个集合都需要携带一个 schema(列名和 dtype 信息),形式为空的 Pandas(或 cuDF)对象。如果未直接向 from_map
函数提供 meta
,则需要通过急切地实例化第一个分区来填充 schema,这会增加 from_map
API 调用本身的明显延迟。因此,如果事先知道预期的列名和 dtypes,始终建议指定一个显式的 meta
参数。
虽然传入 meta
参数可能会减少from_map
API 调用的延迟,但传入 divisions
参数可以减少端到端计算时间。这是因为,通过指定 divisions
,我们允许 Dask DataFrame 跟踪每个分区的有用 min/max 统计信息。因此,如果整体工作流程涉及按索引分组或连接,Dask 可以避免执行不必要的混洗操作。
使用 from_map
实现自定义 API
尽管目前很难自动从任意 Feather 数据集的元数据中提取分区信息,但 from_map
使得使用 PyArrow 实现自己的高功能 read_feather
API 变得相对容易。例如,以下代码是启用具有列投影和索引选择的惰性 Feather IO 所需的全部内容
def from_arrow(table):
"""(Optional) Utility to enforce 'backend' configuration"""
from dask import config
if config.get("dataframe.backend") == "cudf":
import cudf
return cudf.DataFrame.from_arrow(table)
else:
return table.to_pandas()
def read_feather(paths, columns=None, index=None):
"""Create a Dask DataFrame from Feather files
Example of a "custom" `from_map` IO function
Parameters
----------
paths: list
List of Feather-formatted paths. Each path will
be mapped to a distinct DataFrame partition.
columns: list or None, default None
Optional list of columns to select from each file.
index: str or None, default None
Optional column name to set as the DataFrame index.
Returns
-------
dask.dataframe.DataFrame
"""
import dask.dataframe as dd
import pyarrow.dataset as ds
# Step 1: Extract `meta` from the dataset
dataset = ds.dataset(paths, format="feather")
meta = from_arrow(dataset.schema.empty_table())
meta = meta.set_index(index) if index else meta
columns = columns or list(meta.columns)
meta = meta[columns]
# Step 2: Define the `func` argument
def func(frag, columns=None, index=None):
# Create a Pandas DataFrame from a dataset fragment
# NOTE: In practice, this function should
# always be defined outside `read_feather`
assert columns is not None
read_columns = columns
if index and index not in columns:
read_columns = columns + [index]
df = from_arrow(frag.to_table(columns=read_columns))
df = df.set_index(index) if index else df
return df[columns] if columns else df
# Step 3: Define the `iterable` argument
iterable = dataset.get_fragments()
# Step 4: Call `from_map`
return dd.from_map(
func,
iterable,
meta=meta,
index=index, # `func` kwarg
columns=columns, # `func` kwarg
)
这里我们看到,使用 from_map
启用完全惰性的集合创建仅需要四个步骤。首先,我们使用 pyarrow.dataset
为 from_map
定义一个 meta
参数,以便避免急切读取操作的不必要开销。对于某些文件格式和/或应用程序,此时也可能计算 divisions
。然而,如上所述,此类信息对于本例不易获取。
第二步是定义我们将用于生成每个最终 DataFrame 分区的底层函数(func
)。第三步,我们定义一个或多个包含生成每个分区所需唯一信息的可迭代对象(iterable
)。在本例中,唯一的可迭代对象对应于 pyarrow.dataset
片段的生成器,它本质上是输入路径列表的包装器。
第四步也是最后一步是使用最终的 func
、iterable
和 meta
信息调用 from_map
API。请注意,我们还利用此机会指定额外的关键字参数,例如 columns
和 index
。与始终映射到 func
的可迭代位置参数不同,这些关键字参数将被广播。
使用上面实现的read_feather
,将任意 Feather 数据集转换为惰性 Dask DataFrame 集合变得既简单又高效
>>> ddf = read_feather(paths, columns=["A"], index="index")
>>> ddf
Dask DataFrame Structure:
A
npartitions=2
int64
...
...
Dask Name: func, 1 graph layer
>>> ddf.compute()
A
index
0 0
1 0
2 0
3 1
4 1
5 1
进阶:增强列投影
尽管上面实现的 read_feather
很可能满足大多数应用程序的基本需求,但用户在实践中很可能经常省略 column
参数。例如
a = read_feather(paths)["A"]
对于这样的代码,按照当前的实现方式,每个 IO 任务将被迫读取整个 Feather 文件,然后在将数据读入内存后,才能从 Pandas/cuDF DataFrame 中选择 ”A”
列。对于这里使用的玩具数据集来说,额外的开销微不足道。然而,避免这种不必要的 IO 可以显著提升实际应用程序的性能。
那么,我们如何修改我们的 read_feather
实现以利用外部列投影操作(例如 ddf["A"]
)呢?好消息是 from_map
已经具备了必要的图优化钩子来处理这个问题,只要 func
对象满足 DataFrameIOFunction
协议即可
@runtime_checkable
class DataFrameIOFunction(Protocol):
"""DataFrame IO function with projectable columns
Enables column projection in ``DataFrameIOLayer``.
"""
@property
def columns(self):
"""Return the current column projection"""
raise NotImplementedError
def project_columns(self, columns):
"""Return a new DataFrameIOFunction object
with a new column projection
"""
raise NotImplementedError
def __call__(self, *args, **kwargs):
"""Return a new DataFrame partition"""
raise NotImplementedError
也就是说,我们所需要做的就是将我们实现的“步骤 2”更改为使用以下代码
from dask.dataframe.io.utils import DataFrameIOFunction
class ReadFeather(DataFrameIOFunction):
"""Create a Pandas/cuDF DataFrame from a dataset fragment"""
def __init__(self, columns, index):
self._columns = columns
self.index = index
@property
def columns(self):
return self._columns
def project_columns(self, columns):
# Replace this object with one that will only read `columns`
if columns != self.columns:
return ReadFeather(columns, self.index)
return self
def __call__(self, frag):
# Same logic as original `func`
read_columns = self.columns
if index and self.index not in self.columns:
read_columns = self.columns + [self.index]
df = from_arrow(frag.to_table(columns=read_columns))
df = df.set_index(self.index) if self.index else df
return df[self.columns] if self.columns else df
func = ReadFeather(columns, index)
结论
现在,从任意数据源创建 Dask DataFrame 集合比以往任何时候都更容易。尽管 dask.delayed
API 多年来已经实现了类似的功能,但 from_map
现在使得在不牺牲 Dask DataFrame API 其余部分利用的任何高级图优化的情况下实现自定义 IO 函数成为可能。
立即开始尝试使用 from_map
,并告诉我们您的体验!
博客评论由 Disqus 提供支持