Dask DataFrame 为几种流行的表格数据格式(如 CSV 和 Parquet)提供了专门的 IO 函数。如果您使用的是受支持的格式,那么相应的函数(例如 read_csv)很可能是创建新的 Dask DataFrame 集合的最可靠方法。对于其他工作流程,from_map 现在提供了一种方便的方法,可以将 DataFrame 集合定义为任意函数映射。虽然这些类型的工作流程历来要求用户采用 Dask Delayed API,但 from_map 现在使自定义集合创建变得更容易且性能更高。

什么是 from_map?

from_map API 已添加到 Dask DataFrame 的 v2022.05.1 版本中,旨在取代 from_delayed 成为自定义 DataFrame 创建的推荐方法。其核心是,from_map 简单地使用一个通用函数(func)将可迭代对象(inputs)的每个元素转换为一个独立的 Dask DataFrame 分区。

dd.from_map(func: Callable, iterable: Iterable) -> dd.DataFrame

整体行为本质上是标准 Python 的 map 函数在 Dask DataFrame 中的对应物

map(func: Callable, iterable: Iterable) -> Iterator

请注意,from_mapmap 都实际上支持任意数量的可迭代输入。但是,本文中我们只关注单个可迭代参数的使用。

一个简单的例子

为了更好地理解 from_map 的行为,让我们考虑一个简单的例子:我们想与使用以下 Pandas 代码创建的 Feather 格式数据进行交互

import pandas as pd

size = 3
paths = ["./data.0.feather", "./data.1.feather"]
for i, path in enumerate(paths):
    index = range(i * size, i * size + size)
    a = [i] * size
    b = list("xyz")
    df = pd.DataFrame({"A": a, "B": b, "index": index})
    df.to_feather(path)

由于 Dask 尚未提供专门的 read_feather 函数(截至 dask-2023.3.1),大多数用户可能会认为创建 Dask DataFrame 集合的唯一选项是使用 dask.delayed。然而,在这种情况下创建集合的“最佳实践”是将 pd.read_feathercudf.read_feather 封装在 from_map 调用中,如下所示

>>> import dask.dataframe as dd
>>> ddf = dd.from_map(pd.read_feather, paths)
>>> ddf
Dask DataFrame Structure:
                   A       B  index
npartitions=2
               int64  object  int64
                 ...     ...    ...
                 ...     ...    ...
Dask Name: read_feather, 1 graph layer

计算后产生以下 Pandas(或 cuDF)对象

>>> ddf.compute()
   A  B  index
0  0  x      0
1  0  y      1
2  0  z      2
0  1  x      3
1  1  y      4
2  1  z      5

尽管使用传统的 dd.from_delayed 策略也可以实现相同的输出,但使用 from_map 将改善 Dask 内部分布式任务图优化的机会。

性能考量:指定 metadivisions

虽然 funciterablefrom_map 唯一必需的参数,但通过指定可选参数如 metadivisions,可以显著提高工作流程的整体性能。

由于 Dask DataFrame 的惰性特性,每个集合都需要携带一个 schema(列名和 dtype 信息),形式为空的 Pandas(或 cuDF)对象。如果未直接向 from_map 函数提供 meta,则需要通过急切地实例化第一个分区来填充 schema,这会增加 from_map API 调用本身的明显延迟。因此,如果事先知道预期的列名和 dtypes,始终建议指定一个显式的 meta 参数。

虽然传入 meta 参数可能会减少from_map API 调用的延迟,但传入 divisions 参数可以减少端到端计算时间。这是因为,通过指定 divisions,我们允许 Dask DataFrame 跟踪每个分区的有用 min/max 统计信息。因此,如果整体工作流程涉及按索引分组或连接,Dask 可以避免执行不必要的混洗操作。

使用 from_map 实现自定义 API

尽管目前很难自动从任意 Feather 数据集的元数据中提取分区信息,但 from_map 使得使用 PyArrow 实现自己的高功能 read_feather API 变得相对容易。例如,以下代码是启用具有列投影和索引选择的惰性 Feather IO 所需的全部内容

def from_arrow(table):
    """(Optional) Utility to enforce 'backend' configuration"""
    from dask import config

    if config.get("dataframe.backend") == "cudf":
        import cudf

        return cudf.DataFrame.from_arrow(table)
    else:
        return table.to_pandas()


def read_feather(paths, columns=None, index=None):
    """Create a Dask DataFrame from Feather files

    Example of a "custom" `from_map` IO function

    Parameters
    ----------
    paths: list
        List of Feather-formatted paths. Each path will
        be mapped to a distinct DataFrame partition.
    columns: list or None, default None
        Optional list of columns to select from each file.
    index: str or None, default None
        Optional column name to set as the DataFrame index.

    Returns
    -------
    dask.dataframe.DataFrame
    """
    import dask.dataframe as dd
    import pyarrow.dataset as ds

    # Step 1: Extract `meta` from the dataset
    dataset = ds.dataset(paths, format="feather")
    meta = from_arrow(dataset.schema.empty_table())
    meta = meta.set_index(index) if index else meta
    columns = columns or list(meta.columns)
    meta = meta[columns]

    # Step 2: Define the `func` argument
    def func(frag, columns=None, index=None):
        # Create a Pandas DataFrame from a dataset fragment
        # NOTE: In practice, this function should
        # always be defined outside `read_feather`
        assert columns is not None
        read_columns = columns
        if index and index not in columns:
            read_columns = columns + [index]
        df = from_arrow(frag.to_table(columns=read_columns))
        df = df.set_index(index) if index else df
        return df[columns] if columns else df

    # Step 3: Define the `iterable` argument
    iterable = dataset.get_fragments()

    # Step 4: Call `from_map`
    return dd.from_map(
        func,
        iterable,
        meta=meta,
        index=index,  # `func` kwarg
        columns=columns,  # `func` kwarg
    )

这里我们看到,使用 from_map 启用完全惰性的集合创建仅需要四个步骤。首先,我们使用 pyarrow.datasetfrom_map 定义一个 meta 参数,以便避免急切读取操作的不必要开销。对于某些文件格式和/或应用程序,此时也可能计算 divisions。然而,如上所述,此类信息对于本例不易获取。

第二步是定义我们将用于生成每个最终 DataFrame 分区的底层函数(func)。第三步,我们定义一个或多个包含生成每个分区所需唯一信息的可迭代对象(iterable)。在本例中,唯一的可迭代对象对应于 pyarrow.dataset 片段的生成器,它本质上是输入路径列表的包装器。

第四步也是最后一步是使用最终的 funciterablemeta 信息调用 from_map API。请注意,我们还利用此机会指定额外的关键字参数,例如 columnsindex。与始终映射到 func 的可迭代位置参数不同,这些关键字参数将被广播。

使用上面实现的read_feather,将任意 Feather 数据集转换为惰性 Dask DataFrame 集合变得既简单又高效

>>> ddf = read_feather(paths, columns=["A"], index="index")
>>> ddf
Dask DataFrame Structure:
                   A
npartitions=2
               int64
                 ...
                 ...
Dask Name: func, 1 graph layer
>>> ddf.compute()
       A
index
0      0
1      0
2      0
3      1
4      1
5      1

进阶:增强列投影

尽管上面实现的 read_feather 很可能满足大多数应用程序的基本需求,但用户在实践中很可能经常省略 column 参数。例如

a = read_feather(paths)["A"]

对于这样的代码,按照当前的实现方式,每个 IO 任务将被迫读取整个 Feather 文件,然后在将数据读入内存后,才能从 Pandas/cuDF DataFrame 中选择 ”A” 列。对于这里使用的玩具数据集来说,额外的开销微不足道。然而,避免这种不必要的 IO 可以显著提升实际应用程序的性能。

那么,我们如何修改我们的 read_feather 实现以利用外部列投影操作(例如 ddf["A"])呢?好消息是 from_map 已经具备了必要的图优化钩子来处理这个问题,只要 func 对象满足 DataFrameIOFunction 协议即可

@runtime_checkable
class DataFrameIOFunction(Protocol):
    """DataFrame IO function with projectable columns
    Enables column projection in ``DataFrameIOLayer``.
    """

    @property
    def columns(self):
        """Return the current column projection"""
        raise NotImplementedError

    def project_columns(self, columns):
        """Return a new DataFrameIOFunction object
        with a new column projection
        """
        raise NotImplementedError

    def __call__(self, *args, **kwargs):
        """Return a new DataFrame partition"""
        raise NotImplementedError

也就是说,我们所需要做的就是将我们实现的“步骤 2”更改为使用以下代码

    from dask.dataframe.io.utils import DataFrameIOFunction

    class ReadFeather(DataFrameIOFunction):
        """Create a Pandas/cuDF DataFrame from a dataset fragment"""
        def __init__(self, columns, index):
            self._columns = columns
            self.index = index

        @property
        def columns(self):
            return self._columns

        def project_columns(self, columns):
            # Replace this object with one that will only read `columns`
            if columns != self.columns:
                return ReadFeather(columns, self.index)
            return self

        def __call__(self, frag):
            # Same logic as original `func`
            read_columns = self.columns
            if index and self.index not in self.columns:
                read_columns = self.columns + [self.index]
            df = from_arrow(frag.to_table(columns=read_columns))
            df = df.set_index(self.index) if self.index else df
            return df[self.columns] if self.columns else df

    func = ReadFeather(columns, index)

结论

现在,从任意数据源创建 Dask DataFrame 集合比以往任何时候都更容易。尽管 dask.delayed API 多年来已经实现了类似的功能,但 from_map 现在使得在不牺牲 Dask DataFrame API 其余部分利用的任何高级图优化的情况下实现自定义 IO 函数成为可能。

立即开始尝试使用 from_map,并告诉我们您的体验!


博客评论由 Disqus 提供支持