DataFrame 处理操作 API¶
arcticdb.LazyDataFrame ¶
继承自: QueryBuilder
Lazy DataFrame 实现,允许在实际执行读取之前添加查询链。当 lazy=True
时,由 Library.read
、Library.head
和 Library.tail
调用返回。
另请参阅
有关支持的查询操作,请参阅 QueryBuilder。
示例
>>>
# Specify that we want version 0 of "test" symbol, and to only return the "new_column" column in the output
>>> lazy_df = lib.read("test", as_of=0, columns=["new_column"], lazy=True)
# Perform a filtering operation
>>> lazy_df = lazy_df[lazy_df["col1"].isin(0, 3, 6, 9)]
# Create a new column through a projection operation
>>> lazy_df["new_col"] = lazy_df["col1"] + lazy_df["col2"]
# Actual read and processing happens here
>>> df = lazy_df.collect().data
方法 | 描述 |
---|---|
collect |
读取数据并执行自读取调用以来应用于此对象的任何查询。 |
collect ¶
collect() -> VersionedItem
读取数据并执行自读取调用以来应用于此对象的任何查询。
返回 | 描述 |
---|---|
VersionedItem
|
包含 .data 和 .metadata 元素的对象的。 |
arcticdb.LazyDataFrameCollection ¶
继承自: QueryBuilder
用于批量操作的 Lazy DataFrame 实现。允许在实际执行读取之前添加查询链。应用于此对象的查询将应用于所有正在读取的 symbol。如果需要针对每个 symbol 进行查询,可以使用 split 将此类分解为 LazyDataFrame 对象的列表。当 lazy=True
时,由 Library.read_batch
调用返回。
另请参阅
有关支持的查询操作,请参阅 QueryBuilder。
示例
>>>
# Specify that we want the latest version of "test_0" symbol, and version 0 of "test_1" symbol
>>> lazy_dfs = lib.read_batch(["test_0", ReadRequest("test_1", as_of=0)], lazy=True)
# Perform a filtering operation on both the "test_0" and "test_1" symbols
>>> lazy_dfs = lazy_dfs[lazy_dfs["col1"].isin(0, 3, 6, 9)]
# Perform a different projection operation on each symbol
>>> lazy_dfs = lazy_dfs.split()
>>> lazy_dfs[0].apply("new_col", lazy_dfs[0]["col1"] + 1)
>>> lazy_dfs[1].apply("new_col", lazy_dfs[1]["col1"] + 2)
# Bring together again and perform the same filter on both symbols
>>> lazy_dfs = LazyDataFrameCollection(lazy_dfs)
>>> lazy_dfs = lazy_dfs[lazy_dfs["new_col"] > 0]
# Actual read and processing happens here
>>> res = lazy_dfs.collect()
方法 | 描述 |
---|---|
__init__ |
将 LazyDataFrame 列表收集到一个可以一起收集的单个对象中。 |
collect |
读取数据并执行自 read_batch 调用以来应用于此对象的任何查询。 |
split |
将集合分离为 LazyDataFrame 列表,包括已应用于此对象的任何查询。 |
__init__ ¶
__init__(lazy_dataframes: List[LazyDataFrame])
将 LazyDataFrame 列表收集到一个可以一起收集的单个对象中。
参数 | 描述 |
---|---|
lazy_dataframes
|
要收集在一起的 LazyDataFrame 集合。
TYPE: |
collect ¶
collect() -> List[Union[VersionedItem, DataError]]
读取数据并执行自 read_batch 调用以来应用于此对象的任何查询。
返回 | 描述 |
---|---|
List[Union[VersionedItem, DataError]]
|
请参阅 Library.read_batch 的文档。 |
split ¶
split() -> List[LazyDataFrame]
将集合分离为 LazyDataFrame 列表,包括已应用于此对象的任何查询。
返回 | 描述 |
---|---|
List[LazyDataFrame]
|
|
arcticdb.QueryBuilder ¶
构建一个用于处理读取结果的查询。语法设计得类似于 Pandas
q = adb.QueryBuilder()
q = q[q["a"] < 5] (equivalent to q = q[q.a < 5] provided the column name is also a valid Python variable name)
dataframe = lib.read(symbol, query_builder=q).data
有关 Group By 和 Aggregation 功能,请参阅 groupby
的文档。有关投影功能,请参阅 apply
方法的文档。
投影或过滤时支持的算术运算
- 二元算术: +, -, *, /
- 一元算术: -, abs
支持的过滤操作
-
isna, isnull, notna 和 notnull - 返回指定列为/不为 NaN 或 None 的所有行。isna 等同于 isnull,notna 等同于 notnull,即对于同时支持 NaN 和 None 值的列类型(例如字符串),不区分 NaN 和 None 值。例如
q = q[q["col"].isna()]
-
二元比较: <, <=, >, >=, ==, !=
- 一元非: ~
- 二元组合器: &, |, ^
- 列表成员资格: isin, isnotin (也可以通过 == 和 != 访问)
isin/isnotin 接受 list, set, frozenset, 1D ndarray 或 *args 解包。例如
l = [1, 2, 3]
q.isin(l)
等同于...
q.isin(1, 2, 3)
可以直接对布尔列进行过滤
q = adb.QueryBuilder()
q = q[q["boolean_column"]]
并可以直观地与其他操作结合
q = adb.QueryBuilder()
q = q[(q["boolean_column_1"] & ~q["boolean_column_2"]) & (q["numeric_column"] > 0)]
这些表达式的任意组合都是可能的,例如
q = q[(((q["a"] * q["b"]) / 5) < (0.7 * q["c"])) & (q["b"] != 12)]
更多用法示例请参阅 tests/unit/arcticdb/version_store/test_filtering.py。
时间戳过滤¶
支持 pandas.Timestamp, datetime.datetime, pandas.Timedelta 和 datetime.timedelta 对象。请注意,在内部所有这些类型都转换为纳秒(在 Timestamp/datetime 的情况下自 epoch 以来)。这意味着诸如将两个时间相乘等无意义的操作是允许的(但不鼓励)。
限制¶
字符串相等/不等(以及 isin/isnotin)仅支持可打印的 ASCII 字符。虽然不禁止,但不建议将 ==, !=, isin 或 isnotin 用于浮点值。
异常¶
inf 或 -inf 值用于比较 查询中涉及的列是 Categorical Symbol 已序列化 查询中涉及的列在 symbol 中不存在 查询涉及使用 <, <=, >, 或 >= 运算符比较字符串 查询涉及将字符串与一个或多个数值进行比较,反之亦然 查询涉及对包含字符串的列进行算术运算
方法 | 描述 |
---|---|
apply |
Apply 允许使用支持的 QueryBuilder 数值操作创建新列。有关更多信息,请参阅 |
concat |
连接 symbols 列表。应是提供给以下之一的 QueryBuilder 中的第一个子句 |
date_range |
要读取数据的 DateRange。仅适用于具有 DateTime 索引的 Pandas 数据。仅返回数据中落在给定范围内的一部分。 |
groupby |
按列名对 symbol 进行分组。GroupBy 操作后必须跟一个聚合运算符。目前支持以下五种聚合运算符: |
head |
过滤掉所有数据,只保留前 n 行。如果 n 为负数,则返回除最后 n 行之外的所有行。 |
optimise_for_memory |
在查询期间降低峰值内存使用,但会牺牲一些性能。 |
optimise_for_speed |
尽可能快地处理查询(默认行为) |
prepend |
在此 QueryBuilder 已定义的任何处理之前,应用其他 QueryBuilder 中指定的处理。 |
resample |
在索引上对 symbol 进行重采样。symbol 必须是 datetime 索引。重采样操作后必须跟一个聚合运算符。 |
row_range |
要读取数据的行范围。包含下界,不包含上界。 |
tail |
过滤掉所有数据,只保留最后 n 行。如果 n 为负数,则返回除前 n 行之外的所有行。 |
then |
在此 QueryBuilder 已定义的任何处理之后,应用其他 QueryBuilder 中指定的处理。 |
apply ¶
apply(name, expr)
Apply 允许使用支持的 QueryBuilder 数值操作创建新列。有关支持的表达式的更多信息,请参阅 QueryBuilder 类的文档 - 任何在过滤器中有效的表达式在使用 apply
时都有效。
参数 | 描述 |
---|---|
name
|
要创建的列的名称
|
expr
|
表达式
|
示例
>>> df = pd.DataFrame(
{
"VWAP": np.arange(0, 10, dtype=np.float64),
"ASK": np.arange(10, 20, dtype=np.uint16),
"VOL_ACC": np.arange(20, 30, dtype=np.int32),
},
index=np.arange(10),
)
>>> lib.write("expression", df)
>>> q = adb.QueryBuilder()
>>> q = q.apply("ADJUSTED", q["ASK"] * q["VOL_ACC"] + 7)
>>> lib.read("expression", query_builder=q).data
VOL_ACC ASK VWAP ADJUSTED
0 20 10 0.0 207
1 21 11 1.0 238
2 22 12 2.0 271
3 23 13 3.0 306
4 24 14 4.0 343
5 25 15 5.0 382
6 26 16 6.0 423
7 27 17 7.0 466
8 28 18 8.0 511
9 29 19 9.0 558
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
concat ¶
concat(join: str = 'outer')
连接 symbols 列表。应是提供给 NativeVersionStore.batch_read_and_join 或 Library.read_batch_and_join 的 QueryBuilder 中的第一个子句。
参数 | 描述 |
---|---|
join
|
输入 symbol 的列是否应进行 inner 或 outer join。支持的输入包括 "inner" 和 "outer"。 * inner - 只有所有输入 symbol 中都存在的列才会出现在返回的 DataFrame 中。 * outer - 任何输入 symbol 中存在的列都会出现在返回的 DataFrame 中。某些输入 symbol 中存在但其他 symbol 中不存在的列将根据其类型使用与动态 schema 相同的规则进行回填。
TYPE: |
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
引发异常 | 描述 |
---|---|
ArcticNativeException
|
join 参数不是 "inner" 或 "outer" 之一 |
示例
将 2 个 symbol 连接在一起,不进行任何预处理或后处理。
>>> df0 = pd.DataFrame(
{
"col1": [0.5],
"col2": [1],
},
index=[pd.Timestamp("2025-01-01")],
)
>>> df1 = pd.DataFrame(
{
"col3": ["hello"],
"col2": [2],
},
index=[pd.Timestamp("2025-01-02")],
)
>>> q = adb.QueryBuilder()
>>> q = q.concat("outer")
>>> lib.write("symbol0", df0)
>>> lib.write("symbol1", df1)
>>> lib.batch_read_and_join(["symbol0", "symbol1"], query_builder=q).data
col1 col2 col3
2025-01-01 00:00:00 0.5 1 None
2025-01-02 00:00:00 NaN 2 "hello"
>>> q = adb.QueryBuilder()
>>> q = q.concat("inner")
>>> lib.batch_read_and_join(["symbol0", "symbol1"], query_builder=q).data
col2
2025-01-01 00:00:00 1
2025-01-02 00:00:00 2
date_range ¶
date_range(date_range: DateRangeInput)
要读取数据的 DateRange。仅适用于具有 DateTime 索引的 Pandas 数据。仅返回落在给定范围内的数据部分。如果这是唯一应用的 processing 子句,则返回的数据对象将比直接将 date_range 作为参数传递给 read 方法使用更少的内存,代价是可能会稍微慢一些。
参数 | 描述 |
---|---|
date_range
|
与 read 方法接受的格式相同的日期范围。
TYPE: |
示例
>>> q = adb.QueryBuilder()
>>> q = q.date_range((pd.Timestamp("2000-01-01"), pd.Timestamp("2001-01-01")))
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
groupby ¶
groupby(name: str)
按列名对 symbol 进行分组。GroupBy 操作后必须跟一个聚合运算符。目前支持以下五种聚合运算符:
- "mean" - 计算组的平均值
- "sum" - 计算组的总和
- "min" - 计算组的最小值
- "max" - 计算组的最大值
- "count" - 计算组的数量
有关使用示例,请参见下文。
参数 | 描述 |
---|---|
name
|
要分组的列名。注意,目前 GroupBy 仅支持单列分组。
TYPE: |
示例
两个组的平均值 (mean)
>>> df = pd.DataFrame(
{
"grouping_column": ["group_1", "group_1", "group_1", "group_2", "group_2"],
"to_mean": [1.1, 1.4, 2.5, np.nan, 2.2],
},
index=np.arange(5),
)
>>> q = adb.QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_mean": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
to_mean
group_1 1.666667
group_2 2.2
一个组的最大值
>>> df = pd.DataFrame(
{
"grouping_column": ["group_1", "group_1", "group_1"],
"to_max": [1, 5, 4],
},
index=np.arange(3),
)
>>> q = adb.QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_max": "max"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
to_max
group_1 5
最大值和平均值
>>> df = pd.DataFrame(
{
"grouping_column": ["group_1", "group_1", "group_1"],
"to_mean": [1.1, 1.4, 2.5],
"to_max": [1.1, 1.4, 2.5]
},
index=np.arange(3),
)
>>> q = adb.QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_max": "max", "to_mean": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
to_max to_mean
group_1 2.5 1.666667
一列的最小值和最大值,另一列的平均值
>>> df = pd.DataFrame(
{
"grouping_column": ["group_1", "group_1", "group_1", "group_2", "group_2"],
"agg_1": [1, 2, 3, 4, 5],
"agg_2": [1.1, 1.4, 2.5, np.nan, 2.2],
},
index=np.arange(5),
)
>>> q = adb.QueryBuilder()
>>> q = q.groupby("grouping_column")
>>> q = q.agg({"agg_1_min": ("agg_1", "min"), "agg_1_max": ("agg_1", "max"), "agg_2": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
agg_1_min agg_1_max agg_2
group_1 1 3 1.666667
group_2 4 5 2.2
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
head ¶
head(n: int = 5)
过滤掉所有数据,只保留前 n 行。如果 n 为负数,则返回除最后 n 行之外的所有行。
参数 | 描述 |
---|---|
n
|
如果为非负数,则选择的行数;否则为要排除的行数。
TYPE: |
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
optimise_for_memory ¶
optimise_for_memory()
在查询期间降低峰值内存使用,但会牺牲一些性能。
已应用的优化
- 在处理流水线中,从存储读取的段中存在的字符串所使用的内存(这些字符串在最终呈现给用户的 DataFrame 中不需要)会更早地被回收。
prepend ¶
prepend(other)
在此 QueryBuilder 已定义的任何处理之前,应用其他 QueryBuilder 中指定的处理。
参数 | 描述 |
---|---|
other
|
在此 QueryBuilder 之前在处理流水线中应用的 QueryBuilder。
|
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
resample ¶
resample(
rule: Union[str, DateOffset],
closed: Optional[str] = None,
label: Optional[str] = None,
offset: Optional[Union[str, Timedelta]] = None,
origin: Union[str, Timestamp] = "epoch",
)
在索引上对 symbol 进行重采样。symbol 必须是 datetime 索引。重采样操作后必须跟一个聚合运算符。目前支持以下 7 种聚合运算符:
- "mean" - 计算组的平均值
- "sum" - 计算组的总和
- "min" - 计算组的最小值
- "max" - 计算组的最大值
- "count" - 计算组的数量
- "first" - 计算组中的第一个值
- "last" - 计算组中的最后一个值
注意,并非所有聚合器都支持所有列类型
- 数值列 - 支持所有聚合器
- 布尔列 - 支持所有聚合器
- 字符串列 - 支持 count, first 和 last 聚合器
- Datetime 列 - 支持除 sum 外的所有聚合器
请注意,symbol 中不包含索引值的时间桶将不会包含在返回的 DataFrame 中。这与 Pandas 的默认行为不同。重采样目前不支持以下情况:
- 动态 schema,其中聚合列在一个或多个行切片中缺失。
- 稀疏数据。
重采样结果与 pandas resample 的 origin="epoch" 匹配。我们计划在未来版本中添加一个 'origin' 参数,并将默认值更改为 '"start_day"' 以匹配 Pandas 的默认值。这将在 rule 不是 24 小时倍数的情况下改变结果。
参数 | 描述 |
---|---|
rule
|
对数据进行重采样的频率。支持的规则字符串包括 ns, us, ms, s, min, h, 和 D,以及它们的倍数/组合,例如 1h30min。也接受表示此集合中频率的 pd.DataOffset 对象。
TYPE: |
closed
|
每个时间桶的哪个边界是闭合的。必须是 'left' 或 'right' 之一。如果未提供,则对于目前支持的所有频率,默认值为 left。
TYPE: |
label
|
每个时间桶的哪个边界用作返回 DataFrame 中的索引值。必须是 'left' 或 'right' 之一。如果未提供,则对于目前支持的所有频率,默认值为 left。
TYPE: |
offset
|
偏移每个桶的开始。支持的字符串与 pd.Timedelta 中的相同。如果 offset 大于 rule,则使用 offset 对 rule 取模作为偏移量。
TYPE: |
origin
|
用于调整分组的时间戳。支持的字符串有:
TYPE: |
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
引发异常 | 描述 |
---|---|
ArcticDbNotYetImplemented
|
提供给 rule 参数的频率字符串或 Pandas DateOffset 对象不在上面列出的支持频率范围内。 |
ArcticNativeException
|
closed 或 label 参数不是 "left" 或 "right" 之一 |
SchemaException
|
在调用 read 时如果满足以下条件则引发:
|
UserInputException
|
|
示例
将两小时的分钟级数据重采样为小时级数据,并对列 'to_sum' 求和
>>> df = pd.DataFrame(
{
"to_sum": np.arange(120),
},
index=pd.date_range("2024-01-01", freq="min", periods=120),
)
>>> q = adb.QueryBuilder()
>>> q = q.resample("h").agg({"to_sum": "sum"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
to_sum
2024-01-01 00:00:00 1770
2024-01-01 01:00:00 5370
同上,但指定每个时间桶的闭合边界是右侧,并按右边界标记输出
>>> q = adb.QueryBuilder()
>>> q = q.resample("h", closed="right", label="right").agg({"to_sum": "sum"})
>>> lib.read("symbol", query_builder=q).data
to_sum
2024-01-01 00:00:00 0
2024-01-01 01:00:00 1830
2024-01-01 02:00:00 5310
聚合中省略 Nones, NaNs 和 NaTs
>>> df = pd.DataFrame(
{
"to_mean": [1.0, np.nan, 2.0],
},
index=pd.date_range("2024-01-01", freq="min", periods=3),
)
>>> q = adb.QueryBuilder()
>>> q = q.resample("h").agg({"to_mean": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
to_mean
2024-01-01 00:00:00 1.5
输出列名可以通过传递给 agg 的字典格式来控制
>>> df = pd.DataFrame(
{
"agg_1": [1, 2, 3, 4, 5],
"agg_2": [1.0, 2.0, 3.0, np.nan, 5.0],
},
index=pd.date_range("2024-01-01", freq="min", periods=5),
)
>>> q = adb.QueryBuilder()
>>> q = q.resample("h")
>>> q = q.agg({"agg_1_min": ("agg_1", "min"), "agg_1_max": ("agg_1", "max"), "agg_2": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
agg_1_min agg_1_max agg_2
2024-01-01 00:00:00 1 5 2.75
row_range ¶
row_range(row_range: Tuple[int, int])
要读取数据的行范围。包含下界,不包含上界。行为应与 df.iloc[start:end] 相同,包括处理负数 start/end 值。
参数 | 描述 |
---|---|
row_range
|
要读取数据的行范围。包含下界,不包含上界。
TYPE: |
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
tail ¶
tail(n: int = 5)
过滤掉所有数据,只保留最后 n 行。如果 n 为负数,则返回除前 n 行之外的所有行。
参数 | 描述 |
---|---|
n
|
如果为非负数,则选择的行数;否则为要排除的行数。
TYPE: |
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |
then ¶
then(other)
在此 QueryBuilder 已定义的任何处理之后,应用其他 QueryBuilder 中指定的处理。
参数 | 描述 |
---|---|
other
|
在此 QueryBuilder 之后在处理流水线中应用的 QueryBuilder。
|
返回 | 描述 |
---|---|
QueryBuilder
|
修改后的 QueryBuilder 对象。 |