跳到内容

DataFrame 处理操作 API

arcticdb.LazyDataFrame

继承自: QueryBuilder

Lazy DataFrame 实现,允许在实际执行读取之前添加查询链。当 lazy=True 时,由 Library.readLibrary.headLibrary.tail 调用返回。

另请参阅

有关支持的查询操作,请参阅 QueryBuilder。

示例

>>>
# Specify that we want version 0 of "test" symbol, and to only return the "new_column" column in the output
>>> lazy_df = lib.read("test", as_of=0, columns=["new_column"], lazy=True)
# Perform a filtering operation
>>> lazy_df = lazy_df[lazy_df["col1"].isin(0, 3, 6, 9)]
# Create a new column through a projection operation
>>> lazy_df["new_col"] = lazy_df["col1"] + lazy_df["col2"]
# Actual read and processing happens here
>>> df = lazy_df.collect().data
方法 描述
collect

读取数据并执行自读取调用以来应用于此对象的任何查询。

collect

collect() -> VersionedItem

读取数据并执行自读取调用以来应用于此对象的任何查询。

返回 描述
VersionedItem

包含 .data 和 .metadata 元素的对象的。

arcticdb.LazyDataFrameCollection

继承自: QueryBuilder

用于批量操作的 Lazy DataFrame 实现。允许在实际执行读取之前添加查询链。应用于此对象的查询将应用于所有正在读取的 symbol。如果需要针对每个 symbol 进行查询,可以使用 split 将此类分解为 LazyDataFrame 对象的列表。当 lazy=True 时,由 Library.read_batch 调用返回。

另请参阅

有关支持的查询操作,请参阅 QueryBuilder。

示例

>>>
# Specify that we want the latest version of "test_0" symbol, and version 0 of "test_1" symbol
>>> lazy_dfs = lib.read_batch(["test_0", ReadRequest("test_1", as_of=0)], lazy=True)
# Perform a filtering operation on both the "test_0" and "test_1" symbols
>>> lazy_dfs = lazy_dfs[lazy_dfs["col1"].isin(0, 3, 6, 9)]
# Perform a different projection operation on each symbol
>>> lazy_dfs = lazy_dfs.split()
>>> lazy_dfs[0].apply("new_col", lazy_dfs[0]["col1"] + 1)
>>> lazy_dfs[1].apply("new_col", lazy_dfs[1]["col1"] + 2)
# Bring together again and perform the same filter on both symbols
>>> lazy_dfs = LazyDataFrameCollection(lazy_dfs)
>>> lazy_dfs = lazy_dfs[lazy_dfs["new_col"] > 0]
# Actual read and processing happens here
>>> res = lazy_dfs.collect()
方法 描述
__init__

将 LazyDataFrame 列表收集到一个可以一起收集的单个对象中。

collect

读取数据并执行自 read_batch 调用以来应用于此对象的任何查询。

split

将集合分离为 LazyDataFrame 列表,包括已应用于此对象的任何查询。

__init__

__init__(lazy_dataframes: List[LazyDataFrame])

将 LazyDataFrame 列表收集到一个可以一起收集的单个对象中。

参数 描述
lazy_dataframes

要收集在一起的 LazyDataFrame 集合。

TYPE: List[LazyDataFrame]

collect

collect() -> List[Union[VersionedItem, DataError]]

读取数据并执行自 read_batch 调用以来应用于此对象的任何查询。

返回 描述
List[Union[VersionedItem, DataError]]

请参阅 Library.read_batch 的文档。

split

split() -> List[LazyDataFrame]

将集合分离为 LazyDataFrame 列表,包括已应用于此对象的任何查询。

返回 描述
List[LazyDataFrame]

arcticdb.QueryBuilder

构建一个用于处理读取结果的查询。语法设计得类似于 Pandas

q = adb.QueryBuilder()
q = q[q["a"] < 5] (equivalent to q = q[q.a < 5] provided the column name is also a valid Python variable name)
dataframe = lib.read(symbol, query_builder=q).data

有关 Group By 和 Aggregation 功能,请参阅 groupby 的文档。有关投影功能,请参阅 apply 方法的文档。

投影或过滤时支持的算术运算

  • 二元算术: +, -, *, /
  • 一元算术: -, abs

支持的过滤操作

  • isna, isnull, notna 和 notnull - 返回指定列为/不为 NaN 或 None 的所有行。isna 等同于 isnull,notna 等同于 notnull,即对于同时支持 NaN 和 None 值的列类型(例如字符串),不区分 NaN 和 None 值。例如

    q = q[q["col"].isna()]
    

  • 二元比较: <, <=, >, >=, ==, !=

  • 一元非: ~
  • 二元组合器: &, |, ^
  • 列表成员资格: isin, isnotin (也可以通过 == 和 != 访问)

isin/isnotin 接受 list, set, frozenset, 1D ndarray 或 *args 解包。例如

l = [1, 2, 3]
q.isin(l)

等同于...

q.isin(1, 2, 3)

可以直接对布尔列进行过滤

q = adb.QueryBuilder()
q = q[q["boolean_column"]]

并可以直观地与其他操作结合

q = adb.QueryBuilder()
q = q[(q["boolean_column_1"] & ~q["boolean_column_2"]) & (q["numeric_column"] > 0)]

这些表达式的任意组合都是可能的,例如

q = q[(((q["a"] * q["b"]) / 5) < (0.7 * q["c"])) & (q["b"] != 12)]

更多用法示例请参阅 tests/unit/arcticdb/version_store/test_filtering.py。

时间戳过滤

支持 pandas.Timestamp, datetime.datetime, pandas.Timedelta 和 datetime.timedelta 对象。请注意,在内部所有这些类型都转换为纳秒(在 Timestamp/datetime 的情况下自 epoch 以来)。这意味着诸如将两个时间相乘等无意义的操作是允许的(但不鼓励)。

限制

字符串相等/不等(以及 isin/isnotin)仅支持可打印的 ASCII 字符。虽然不禁止,但不建议将 ==, !=, isin 或 isnotin 用于浮点值。

异常

inf 或 -inf 值用于比较 查询中涉及的列是 Categorical Symbol 已序列化 查询中涉及的列在 symbol 中不存在 查询涉及使用 <, <=, >, 或 >= 运算符比较字符串 查询涉及将字符串与一个或多个数值进行比较,反之亦然 查询涉及对包含字符串的列进行算术运算

方法 描述
apply

Apply 允许使用支持的 QueryBuilder 数值操作创建新列。有关更多信息,请参阅

concat

连接 symbols 列表。应是提供给以下之一的 QueryBuilder 中的第一个子句

date_range

要读取数据的 DateRange。仅适用于具有 DateTime 索引的 Pandas 数据。仅返回数据中落在给定范围内的一部分。

groupby

按列名对 symbol 进行分组。GroupBy 操作后必须跟一个聚合运算符。目前支持以下五种聚合运算符:

head

过滤掉所有数据,只保留前 n 行。如果 n 为负数,则返回除最后 n 行之外的所有行。

optimise_for_memory

在查询期间降低峰值内存使用,但会牺牲一些性能。

optimise_for_speed

尽可能快地处理查询(默认行为)

prepend

在此 QueryBuilder 已定义的任何处理之前,应用其他 QueryBuilder 中指定的处理。

resample

在索引上对 symbol 进行重采样。symbol 必须是 datetime 索引。重采样操作后必须跟一个聚合运算符。

row_range

要读取数据的行范围。包含下界,不包含上界。

tail

过滤掉所有数据,只保留最后 n 行。如果 n 为负数,则返回除前 n 行之外的所有行。

then

在此 QueryBuilder 已定义的任何处理之后,应用其他 QueryBuilder 中指定的处理。

apply

apply(name, expr)

Apply 允许使用支持的 QueryBuilder 数值操作创建新列。有关支持的表达式的更多信息,请参阅 QueryBuilder 类的文档 - 任何在过滤器中有效的表达式在使用 apply 时都有效。

参数 描述
name

要创建的列的名称

expr

表达式

示例

>>> df = pd.DataFrame(
    {
        "VWAP": np.arange(0, 10, dtype=np.float64),
        "ASK": np.arange(10, 20, dtype=np.uint16),
        "VOL_ACC": np.arange(20, 30, dtype=np.int32),
    },
    index=np.arange(10),
)
>>> lib.write("expression", df)
>>> q = adb.QueryBuilder()
>>> q = q.apply("ADJUSTED", q["ASK"] * q["VOL_ACC"] + 7)
>>> lib.read("expression", query_builder=q).data
VOL_ACC  ASK  VWAP  ADJUSTED
0     20   10   0.0       207
1     21   11   1.0       238
2     22   12   2.0       271
3     23   13   3.0       306
4     24   14   4.0       343
5     25   15   5.0       382
6     26   16   6.0       423
7     27   17   7.0       466
8     28   18   8.0       511
9     29   19   9.0       558
返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

concat

concat(join: str = 'outer')

连接 symbols 列表。应是提供给 NativeVersionStore.batch_read_and_join 或 Library.read_batch_and_join 的 QueryBuilder 中的第一个子句。

参数 描述
join

输入 symbol 的列是否应进行 inner 或 outer join。支持的输入包括 "inner" 和 "outer"。 * inner - 只有所有输入 symbol 中都存在的列才会出现在返回的 DataFrame 中。 * outer - 任何输入 symbol 中存在的列都会出现在返回的 DataFrame 中。某些输入 symbol 中存在但其他 symbol 中不存在的列将根据其类型使用与动态 schema 相同的规则进行回填。

TYPE: str DEFAULT: "outer"

返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

引发异常 描述
ArcticNativeException

join 参数不是 "inner" 或 "outer" 之一

示例

将 2 个 symbol 连接在一起,不进行任何预处理或后处理。

>>> df0 = pd.DataFrame(
    {
        "col1": [0.5],
        "col2": [1],
    },
    index=[pd.Timestamp("2025-01-01")],
)
>>> df1 = pd.DataFrame(
    {
        "col3": ["hello"],
        "col2": [2],
    },
    index=[pd.Timestamp("2025-01-02")],
)
>>> q = adb.QueryBuilder()
>>> q = q.concat("outer")
>>> lib.write("symbol0", df0)
>>> lib.write("symbol1", df1)
>>> lib.batch_read_and_join(["symbol0", "symbol1"], query_builder=q).data
                       col1     col2     col3
2025-01-01 00:00:00     0.5        1     None
2025-01-02 00:00:00     NaN        2  "hello"
>>> q = adb.QueryBuilder()
>>> q = q.concat("inner")
>>> lib.batch_read_and_join(["symbol0", "symbol1"], query_builder=q).data
                       col2
2025-01-01 00:00:00       1
2025-01-02 00:00:00       2

date_range

date_range(date_range: DateRangeInput)

要读取数据的 DateRange。仅适用于具有 DateTime 索引的 Pandas 数据。仅返回落在给定范围内的数据部分。如果这是唯一应用的 processing 子句,则返回的数据对象将比直接将 date_range 作为参数传递给 read 方法使用更少的内存,代价是可能会稍微慢一些。

参数 描述
date_range

与 read 方法接受的格式相同的日期范围。

TYPE: DateRangeInput

示例

>>> q = adb.QueryBuilder()
>>> q = q.date_range((pd.Timestamp("2000-01-01"), pd.Timestamp("2001-01-01")))
返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

groupby

groupby(name: str)

按列名对 symbol 进行分组。GroupBy 操作后必须跟一个聚合运算符。目前支持以下五种聚合运算符:

  • "mean" - 计算组的平均值
  • "sum" - 计算组的总和
  • "min" - 计算组的最小值
  • "max" - 计算组的最大值
  • "count" - 计算组的数量

有关使用示例,请参见下文。

参数 描述
name

要分组的列名。注意,目前 GroupBy 仅支持单列分组。

TYPE: str

示例

两个组的平均值 (mean)

>>> df = pd.DataFrame(
    {
        "grouping_column": ["group_1", "group_1", "group_1", "group_2", "group_2"],
        "to_mean": [1.1, 1.4, 2.5, np.nan, 2.2],
    },
    index=np.arange(5),
)
>>> q = adb.QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_mean": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
           to_mean
 group_1  1.666667
 group_2       2.2

一个组的最大值

>>> df = pd.DataFrame(
    {
        "grouping_column": ["group_1", "group_1", "group_1"],
        "to_max": [1, 5, 4],
    },
    index=np.arange(3),
)
>>> q = adb.QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_max": "max"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
         to_max
group_1  5

最大值和平均值

>>> df = pd.DataFrame(
    {
        "grouping_column": ["group_1", "group_1", "group_1"],
        "to_mean": [1.1, 1.4, 2.5],
        "to_max": [1.1, 1.4, 2.5]
    },
    index=np.arange(3),
)
>>> q = adb.QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_max": "max", "to_mean": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
         to_max   to_mean
group_1     2.5  1.666667

一列的最小值和最大值,另一列的平均值

>>> df = pd.DataFrame(
    {
        "grouping_column": ["group_1", "group_1", "group_1", "group_2", "group_2"],
        "agg_1": [1, 2, 3, 4, 5],
        "agg_2": [1.1, 1.4, 2.5, np.nan, 2.2],
    },
    index=np.arange(5),
)
>>> q = adb.QueryBuilder()
>>> q = q.groupby("grouping_column")
>>> q = q.agg({"agg_1_min": ("agg_1", "min"), "agg_1_max": ("agg_1", "max"), "agg_2": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
         agg_1_min  agg_1_max     agg_2
group_1          1          3  1.666667
group_2          4          5       2.2
返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

head

head(n: int = 5)

过滤掉所有数据,只保留前 n 行。如果 n 为负数,则返回除最后 n 行之外的所有行。

参数 描述
n

如果为非负数,则选择的行数;否则为要排除的行数。

TYPE: int DEFAULT: 5

返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

optimise_for_memory

optimise_for_memory()

在查询期间降低峰值内存使用,但会牺牲一些性能。

已应用的优化

  • 在处理流水线中,从存储读取的段中存在的字符串所使用的内存(这些字符串在最终呈现给用户的 DataFrame 中不需要)会更早地被回收。

optimise_for_speed

optimise_for_speed()

尽可能快地处理查询(默认行为)

prepend

prepend(other)

在此 QueryBuilder 已定义的任何处理之前,应用其他 QueryBuilder 中指定的处理。

参数 描述
other

在此 QueryBuilder 之前在处理流水线中应用的 QueryBuilder。

返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

resample

resample(
    rule: Union[str, DateOffset],
    closed: Optional[str] = None,
    label: Optional[str] = None,
    offset: Optional[Union[str, Timedelta]] = None,
    origin: Union[str, Timestamp] = "epoch",
)

在索引上对 symbol 进行重采样。symbol 必须是 datetime 索引。重采样操作后必须跟一个聚合运算符。目前支持以下 7 种聚合运算符:

  • "mean" - 计算组的平均值
  • "sum" - 计算组的总和
  • "min" - 计算组的最小值
  • "max" - 计算组的最大值
  • "count" - 计算组的数量
  • "first" - 计算组中的第一个值
  • "last" - 计算组中的最后一个值

注意,并非所有聚合器都支持所有列类型

  • 数值列 - 支持所有聚合器
  • 布尔列 - 支持所有聚合器
  • 字符串列 - 支持 count, first 和 last 聚合器
  • Datetime 列 - 支持除 sum 外的所有聚合器

请注意,symbol 中不包含索引值的时间桶将不会包含在返回的 DataFrame 中。这与 Pandas 的默认行为不同。重采样目前不支持以下情况:

  • 动态 schema,其中聚合列在一个或多个行切片中缺失。
  • 稀疏数据。

重采样结果与 pandas resample 的 origin="epoch" 匹配。我们计划在未来版本中添加一个 'origin' 参数,并将默认值更改为 '"start_day"' 以匹配 Pandas 的默认值。这将在 rule 不是 24 小时倍数的情况下改变结果。

参数 描述
rule

对数据进行重采样的频率。支持的规则字符串包括 ns, us, ms, s, min, h, 和 D,以及它们的倍数/组合,例如 1h30min。也接受表示此集合中频率的 pd.DataOffset 对象。

TYPE: Union[str, DateOffset]

closed

每个时间桶的哪个边界是闭合的。必须是 'left' 或 'right' 之一。如果未提供,则对于目前支持的所有频率,默认值为 left。

TYPE: Optional[str] DEFAULT: None

label

每个时间桶的哪个边界用作返回 DataFrame 中的索引值。必须是 'left' 或 'right' 之一。如果未提供,则对于目前支持的所有频率,默认值为 left。

TYPE: Optional[str] DEFAULT: None

offset

偏移每个桶的开始。支持的字符串与 pd.Timedelta 中的相同。如果 offset 大于 rule,则使用 offset 对 rule 取模作为偏移量。

TYPE: Optional[Union[str, Timedelta]] DEFAULT: None

origin

用于调整分组的时间戳。支持的字符串有:

  • epoch: 起始点是 1970-01-01
  • start: 起始点是时间序列的第一个值
  • start_day: 起始点是时间序列第一个天的午夜
  • end: 起始点是时间序列的最后一个值
  • end_day: 起始点是最后一天的午夜

start, start_day, end, end_day 起始值不支持与 date_range 结合使用。

TYPE: Union[str, Timestamp] DEFAULT: 'epoch'

返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

引发异常 描述
ArcticDbNotYetImplemented

提供给 rule 参数的频率字符串或 Pandas DateOffset 对象不在上面列出的支持频率范围内。

ArcticNativeException

closed 或 label 参数不是 "left" 或 "right" 之一

SchemaException

在调用 read 时如果满足以下条件则引发:

  • 如果指定的聚合与上面指定的被聚合列的类型不兼容。
  • 库启用了动态 schema,并且被聚合的列中至少有一个在一个或多个行切片中缺失。
  • 被聚合的列中至少有一个包含稀疏数据。
UserInputException
  • start, start_day, end, end_daydate_range 结合使用
  • origin 不是 start, start_day, end, end_day, epochpd.Timestamp 之一

示例

将两小时的分钟级数据重采样为小时级数据,并对列 'to_sum' 求和

>>> df = pd.DataFrame(
    {
        "to_sum": np.arange(120),
    },
    index=pd.date_range("2024-01-01", freq="min", periods=120),
)
>>> q = adb.QueryBuilder()
>>> q = q.resample("h").agg({"to_sum": "sum"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
                     to_sum
2024-01-01 00:00:00    1770
2024-01-01 01:00:00    5370

同上,但指定每个时间桶的闭合边界是右侧,并按右边界标记输出

>>> q = adb.QueryBuilder()
>>> q = q.resample("h", closed="right", label="right").agg({"to_sum": "sum"})
>>> lib.read("symbol", query_builder=q).data
                     to_sum
2024-01-01 00:00:00       0
2024-01-01 01:00:00    1830
2024-01-01 02:00:00    5310

聚合中省略 Nones, NaNs 和 NaTs

>>> df = pd.DataFrame(
    {
        "to_mean": [1.0, np.nan, 2.0],
    },
    index=pd.date_range("2024-01-01", freq="min", periods=3),
)
>>> q = adb.QueryBuilder()
>>> q = q.resample("h").agg({"to_mean": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
                     to_mean
2024-01-01 00:00:00      1.5

输出列名可以通过传递给 agg 的字典格式来控制

>>> df = pd.DataFrame(
    {
        "agg_1": [1, 2, 3, 4, 5],
        "agg_2": [1.0, 2.0, 3.0, np.nan, 5.0],
    },
    index=pd.date_range("2024-01-01", freq="min", periods=5),
)
>>> q = adb.QueryBuilder()
>>> q = q.resample("h")
>>> q = q.agg({"agg_1_min": ("agg_1", "min"), "agg_1_max": ("agg_1", "max"), "agg_2": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
                     agg_1_min  agg_1_max     agg_2
2024-01-01 00:00:00          1          5      2.75

row_range

row_range(row_range: Tuple[int, int])

要读取数据的行范围。包含下界,不包含上界。行为应与 df.iloc[start:end] 相同,包括处理负数 start/end 值。

参数 描述
row_range

要读取数据的行范围。包含下界,不包含上界。

TYPE: Tuple[int, int]

返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

tail

tail(n: int = 5)

过滤掉所有数据,只保留最后 n 行。如果 n 为负数,则返回除前 n 行之外的所有行。

参数 描述
n

如果为非负数,则选择的行数;否则为要排除的行数。

TYPE: int DEFAULT: 5

返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。

then

then(other)

在此 QueryBuilder 已定义的任何处理之后,应用其他 QueryBuilder 中指定的处理。

参数 描述
other

在此 QueryBuilder 之后在处理流水线中应用的 QueryBuilder。

返回 描述
QueryBuilder

修改后的 QueryBuilder 对象。