ArcticDB_demo_lazydataframe
在 Github 中查看 | 在 Google Colab 中打开ArcticDB LazyDataFrame 演示

在此演示中,我们将使用 LazyDataFrame 类探讨 ArcticDB 中可用的 DataFrame 处理选项。我们将介绍此 API 的各种可能性,包括
- 过滤
- 投影
- 分组和聚合
- 上述功能的组合
为何在 ArcticDB 中执行处理?
- 通过使用多线程的高效 C++ 实现提升性能
- 高效的数据访问 - 只读取所需数据
- 对于超大数据集,一些查询可能在内存中无法执行,但在这里是可行的
请注意,此处描述的所有操作也可以使用旧版 QueryBuilder
类来执行,但我们认为此 API 更直观!
演示设置¶
必要的包安装
!pip install arcticdb
必要的库导入
import os
import numpy as np
import pandas as pd
import random
import arcticdb as adb
from arcticdb.util.test import random_strings_of_length
在此演示中,我们将配置基于 LMDB 文件的后端。当配置对象存储后端(例如 S3)时,ArcticDB 可实现其高性能和可伸缩性。
arctic = adb.Arctic("lmdb://arcticdb_demo")
您可以拥有无限数量的库,但我们首先创建一个。
if 'sample' not in arctic.list_libraries():
# library does not already exist
arctic.create_library('sample')
lib = arctic.get_library('sample')
运行此单元格以设置预备变量。100,000 个唯一字符串对我们来说是一个病态情况,因为默认的行切片策略是每个数据段有 100,000 行,所以每个唯一字符串在此列中大约每个数据段出现一次。
ten_grouping_values = random_strings_of_length(10, 10, True)
one_hundred_thousand_grouping_values = random_strings_of_length(100_000, 10, True)
rng = np.random.RandomState()
sym_10M = "demo_10M"
sym_100M = "demo_100M"
sym_1B = "demo_1B"
选择您要使用的 symbol
- sym_10M:包含 1000 万行的 symbol
- sym_100M:包含 1 亿行的 symbol
- sym_1B:包含 10 亿行的 symbol
将要使用的 symbol 赋值给 sym 变量
- example: sym = sym_10M
sym = sym_10M
运行此单元格,根据 symbol 名称设置 DataFrame
if sym==sym_10M:
num_rows = 10_000_000
elif sym==sym_100M:
num_rows = 100_000_000
elif sym==sym_1B:
num_rows = 1_000_000_000
input_df = pd.DataFrame(
{
"grouping_column_10": list(random.choices(ten_grouping_values, k=num_rows)),
"grouping_column_100_000": list(random.choices(one_hundred_thousand_grouping_values, k=num_rows)),
"numeric_column": rng.rand((num_rows))
}
)
演示开始¶
lib.write(sym, input_df)
显示数据如何被切片并写入磁盘。
lib._nvs.read_index(sym)
显示前 100 行数据作为示例。
lib.head(sym, n=100).data
读取¶
读取 symbol,不进行任何过滤。
%%time
lib.read(sym)
大部分时间都花在了为包含 100,000 个唯一字符串的列分配 Python 字符串上,因此省略此列会快得多。
%%time
lib.read(sym, columns=["grouping_column_10", "numeric_column"])
过滤¶
请注意,数值列中的所有值都在 0 到 1 之间。因此,此查询不会过滤掉任何数据。这表明执行全表扫描不会显著影响性能。另请注意,read 调用并非即时完成,因为直到在 LazyDataFrame 上调用 collect 才会读取数据。
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df = lazy_df[lazy_df["numeric_column"] < 2.0]
%%time
lazy_df.collect()
现在我们正在过滤,只保留 symbol 中大约 10% 的行。这比直接读取更快,因为需要分配的 Python 字符串更少。
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df = lazy_df[lazy_df["numeric_column"] < 0.1]
df = lazy_df.collect().data
df
投影¶
根据现有列和常量创建新列的速度与不减少显示数据量的过滤器的速度大致相同。
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df["new_column"] = lazy_df["numeric_column"] * 2.0
df = lazy_df.collect().data
df
同样,可以使用 apply 方法获得相同的结果。
lazy_df = lib.read(sym, lazy=True)
lazy_df.apply("new_column", lazy_df["numeric_column"] * 2.0)
lazy_df.collect().data
如果在创建 LazyDataFrame
对象之前使用 apply,col
函数可以用作列名的占位符。
lazy_df = lib.read(sym, lazy=True).apply("new_column", adb.col("numeric_column") * 2.0)
lazy_df.collect().data
分组和聚合¶
由于减少了 Python 字符串分配的数量,分组操作仍然比直接读取更快,即使执行了额外的计算。
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df.groupby("grouping_column_10").agg({"numeric_column": "mean"})
df = lazy_df.collect().data
df
即使对病态大量的唯一值进行分组,也不会显著降低性能。
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df.groupby("grouping_column_100_000").agg({"numeric_column": "mean"})
df = lazy_df.collect().data
df
组合¶
这些操作可以在顺序流水线中任意组合。
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df = lazy_df[lazy_df["numeric_column"] < 0.1].apply("new_column", lazy_df["numeric_column"] * 2.0).groupby("grouping_column_10").agg({"numeric_column": "mean", "new_column": "max"})
df = lazy_df.collect().data
df
批处理操作¶
# Setup two symbols
batch_sym_1 = f'{sym}_1'
batch_sym_2 = f'{sym}_2'
syms = [batch_sym_1, batch_sym_2]
lib.write(batch_sym_1, input_df)
lib.write(batch_sym_2, input_df)
read_batch
也接受一个 lazy
参数,该参数返回一个 LazyDataFrameCollection
。
lazy_dfs = lib.read_batch(syms, lazy=True)
lazy_dfs
相同的处理操作可以应用于批处理中正在读取的所有 symbol。请注意,在单元格输出中,管道符 |
位于 LazyDataFrame
列表外部,因此 WHERE
子句应用于所有 symbol。
lazy_dfs = lazy_dfs[lazy_dfs["numeric_column"] < 0.1]
lazy_dfs
在 LazyDataFrameCollection
上调用 collect()
内部使用了 read_batch
,因此通常比序列化读取调用更高效。
dfs = lazy_dfs.collect()
dfs
dfs[0].data.head()
dfs[1].data.head()
如果需要,可以将不同的处理操作应用于批处理中的各个 symbol。
lazy_dfs = lib.read_batch(syms, lazy=True)
lazy_dfs = lazy_dfs.split()
lazy_dfs
请注意,在单元格输出中,管道符 |
现在位于 LazyDataFrame
列表内部,因此 PROJECT
子句应用于各个 symbol。
lazy_dfs[0].apply("new_column_1", 2 * adb.col("numeric_column"))
lazy_dfs[1].apply("new_column_1", 4 * adb.col("numeric_column"))
lazy_dfs = adb.LazyDataFrameCollection(lazy_dfs)
lazy_dfs
dfs = lazy_dfs.collect()
dfs
dfs[0].data
dfs[1].data
如果需要,可以直观地结合这两种操作模式。
lazy_dfs = lib.read_batch(syms, lazy=True)
lazy_dfs = lazy_dfs[lazy_dfs["numeric_column"] < 0.1]
lazy_dfs = lazy_dfs.split()
lazy_dfs[0].apply("new_column_1", 2 * adb.col("numeric_column"))
lazy_dfs[1].apply("new_column_1", 4 * adb.col("numeric_column"))
lazy_dfs = adb.LazyDataFrameCollection(lazy_dfs)
lazy_dfs = lazy_dfs[lazy_dfs["new_column_1"] < 0.1]
lazy_dfs
dfs = lazy_dfs.collect()
dfs[0].data
dfs[1].data