跳到内容

引言

logo

什么是 ArcticDB?

ArcticDB 是一个为 Python 数据科学生态系统设计的无服务器 DataFrame 数据库引擎。

ArcticDB 支持您在大规模环境下存储、检索和处理 DataFrame,并由通用对象存储(兼容 S3 的存储和 Azure Blob Storage)提供支持。

ArcticDB 除了正在运行的 Python 环境和对象存储访问之外,无需额外基础设施,并且可以在几秒钟内安装完成。

ArcticDB 的特点:

  • 快速
    • 单个消费者每秒可处理高达 1 亿行
    • 所有消费者每秒可处理十亿行
    • 快速简便安装:pip install arcticdb
  • 灵活
    • 无需数据模式
    • 支持流式数据摄入
    • 双时态 - 存储所有之前版本的数据
    • 易于在本地和云上设置
    • 可从开发/研究环境扩展到生产环境
  • 熟悉
    • ArcticDB 是世界上最简单的可共享数据库
    • 对于有 Python 和 Pandas 经验的人来说易于学习
    • 只需您和您的数据 - 认知开销非常低。

入门

本节将介绍安装、设置和基本用法。有关基础知识和高级功能的更多详细信息,请参见教程部分。

安装

ArcticDB 支持 Python 3.7 - 3.13。

安装 ArcticDB,只需运行

pip install arcticdb

设置

ArcticDB 是一个为对象存储设计的存储引擎,同时也支持使用 LMDB 进行本地磁盘存储。

存储

ArcticDB 支持任何兼容 S3 API 的存储,包括 AWS 和 Azure,以及 VAST Universal StoragePure Storage 等存储设备。

ArcticDB 也支持 LMDB 用于本地/文件存储 - 要使用 LMDB,将 LMDB 路径作为 URI 传递:adb.Arctic('lmdb://path/to/desired/database')

入门时,我们可以导入 ArcticDB 并实例化它

import arcticdb as adb
# this will set up the storage using the local file system
uri = "lmdb://tmp/arcticdb_intro"
ac = adb.Arctic(uri)

有关如何为其他存储正确格式化 uri 字符串的更多信息,请查看文档字符串 (help(Arctic)) 或阅读存储访问部分(点击链接或继续阅读本节下方内容)。

库设置

ArcticDB 旨在存储许多(可能数百万)个表。单个表 (DataFrame) 称为 symbols,存储在称为 libraries 的集合中。一个 library 可以存储许多 symbol。

Libraries 必须先在使用前初始化

ac.create_library('intro')  # static schema - see note below
ac.list_libraries()
输出
['intro']

然后必须在代码中实例化该 library,以便准备读写数据

library = ac['intro']

有时使用这种形式结合 library 创建和实例化会更方便,这样会在需要时自动创建 library,省去检查其是否已存在的步骤

library = ac.get_library('intro', create_if_missing=True)

ArcticDB 静态和动态模式

与许多其他数据库不同,ArcticDB 不需要数据模式。您可以写入任何 DataFrame 并稍后读回。如果数据的形状发生变化然后再次写入,一切都会正常工作。简单方便。

唯一需要模式的情况是修改现有 symbol 的函数:updateappend。修改 symbol 时,新数据必须与现有数据具有相同的模式。这里的模式指 DataFrame 中索引的类型以及每列的名称、顺序和类型。换句话说,当您追加新行时,它们必须看起来像现有行。这是默认选项,称为 static schema

但是,如果您需要通过 updateappend 添加、删除或更改列的类型,则可以做到。您只需在创建 library 时设置 dynamic_schema 选项即可。参见 (create_library) 方法的 library_options 参数。

因此您拥有两全其美的优势 - 您可以选择对数据强制执行静态模式,使其不被修改操作更改,也可以允许其灵活变动。

使用静态或动态模式的选择必须在创建 library 时设置。

为清楚起见,本节我们使用的是 static schema

读写数据

现在我们已经设置好了一个 library,可以开始读写数据了。ArcticDB 提供了用于 DataFrame 存储的一组简单函数。

让我们将一个 DataFrame 写入存储。

首先创建数据

# 50 columns, 25 rows, random data, datetime indexed.
import pandas as pd
import numpy as np
from datetime import datetime
cols = ['COL_%d' % i for i in range(50)]
df = pd.DataFrame(np.random.randint(0, 50, size=(25, 50)), columns=cols)
df.index = pd.date_range(datetime(2000, 1, 1, 5), periods=25, freq="h")
df.head(5)
输出(数据的前 5 行)
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-01 05:00:00     18     48     10     16     38     34     25     44  ...
2000-01-01 06:00:00     48     10     24     45     22     36     30     19  ...
2000-01-01 07:00:00     25     16     36     29     25      9     48      2  ...
2000-01-01 08:00:00     38     21      2      1     37      6     31     31  ...
2000-01-01 09:00:00     45     17     39     47     47     11     33     31  ...

然后写入 library

library.write('test_frame', df)
输出(关于写入内容的信息)
VersionedItem(symbol=test_frame,library=intro,data=n/a,version=0,metadata=None,host=<host>)

'test_frame' DataFrame 将用于本指南的剩余部分。

ArcticDB 索引

写入 Pandas DataFrame 时,ArcticDB 支持以下索引类型

  • 包含 int64pandas.Index(或相应的专用类型 Int64Index, UInt64Index
  • RangeIndex
  • DatetimeIndex
  • 由上述支持类型组成的 MultiIndex

head()/tail() 中的“行”概念指的是行号('iloc'),而不是 pandas.Index 中的值('loc')。

从存储中读回数据

from_storage_df = library.read('test_frame').data
from_storage_df.head(5)
输出(前 5 行,但从数据库中读取)
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-01 05:00:00     18     48     10     16     38     34     25     44  ...
2000-01-01 06:00:00     48     10     24     45     22     36     30     19  ...
2000-01-01 07:00:00     25     16     36     29     25      9     48      2  ...
2000-01-01 08:00:00     38     21      2      1     37      6     31     31  ...
2000-01-01 09:00:00     45     17     39     47     47     11     33     31  ...

读取的数据与原始数据当然是匹配的。

切片和过滤

ArcticDB 使您能够按行和按列进行切片。

ArcticDB 索引

ArcticDB 将为有序的数值和时间序列(例如 DatetimeIndex)Pandas 索引构建完整索引。这将实现跨索引条目的优化切片。如果索引未排序或非数字,您的数据仍然可以存储,但行切片会较慢。

行切片

library.read('test_frame', date_range=(df.index[5], df.index[8])).data
输出(请求数据范围内的行)
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-01 10:00:00     23     39      0     45     15     28     10     17  ...
2000-01-01 11:00:00     36     28     22     43     23      6     10      1  ...
2000-01-01 12:00:00     18     42      1     15     19     36     41     36  ...
2000-01-01 13:00:00     28     32     47     37     17     44     29     24  ...

列切片

_range = (df.index[5], df.index[8])
_columns = ['COL_30', 'COL_31']
library.read('test_frame', date_range=_range, columns=_columns).data
输出(请求日期范围内的行和列)
                     COL_30  COL_31
2000-01-01 10:00:00      31       2
2000-01-01 11:00:00       3      34
2000-01-01 12:00:00      24      43
2000-01-01 13:00:00      18       8

过滤和分析

ArcticDB 支持许多常见的 DataFrame 分析操作,包括过滤、投影、分组、聚合和重采样。访问这些操作最直观的方式是通过 LazyDataFrame API,这对于有 Pandas 或 Polars 经验的用户应该会感到熟悉。

旧版的 QueryBuilder 类也可以直接创建并传递给 read 调用,效果相同。

ArcticDB 分析理念

在大多数情况下,这比等效的 Pandas 操作更节省内存且性能更佳,因为处理发生在 C++ 存储引擎内部,并且在多个执行线程上并行化。

import arcticdb as adb
_range = (df.index[5], df.index[8])
_cols = ['COL_30', 'COL_31']
# Using lazy evaluation
lazy_df = library.read('test_frame', date_range=_range, columns=_cols, lazy=True)
lazy_df = lazy_df[(lazy_df["COL_30"] > 10) & (lazy_df["COL_31"] < 40)]
df = lazy_df.collect().data
# Using the legacy QueryBuilder class gives the same result
q = adb.QueryBuilder()
q = q[(q["COL_30"] > 10) & (q["COL_31"] < 40)]
library.read('test_frame', date_range=_range, columns=_cols, query_builder=q).data
输出(按日期范围、列以及基于数据值进行过滤的查询过滤后的数据)
                     COL_30  COL_31
2000-01-01 10:00:00      31       2
2000-01-01 13:00:00      18       8

修改、版本控制(又称时间旅行)

ArcticDB 完全支持通过两种基本操作修改存储的数据:updateappend

这些操作是原子的,但不锁定 symbol。有关更多信息,请参阅关于事务的部分。

追加(Append)

让我们将数据追加到时间序列的末尾。

首先,我们将查看数据的最后几条记录(在修改之前)

library.tail('test_frame', 4).data
输出
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-02 02:00:00     46     12     38     47      4     31      1     42  ...
2000-01-02 03:00:00     46     20      5     42      8     35     12      2  ...
2000-01-02 04:00:00     17     48     36     43      6     46      5      8  ...
2000-01-02 05:00:00     20     19     24     44     29     32      2     19  ...
然后创建 3 条新行来追加。为了使 append 工作,新数据的第一行 datetime 必须在现有数据之后开始。

random_data = np.random.randint(0, 50, size=(3, 50))
df_append = pd.DataFrame(random_data, columns=['COL_%d' % i for i in range(50)])
df_append.index = pd.date_range(datetime(2000, 1, 2, 7), periods=3, freq="h")
df_append
输出
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-02 07:00:00      9     15      4     48     48     35     34     49  ...
2000-01-02 08:00:00     35      4     12     30     30     12     38     25  ...
2000-01-02 09:00:00     25     17      3      1      1     15     33     49  ...

现在将该 DataFrame 追加到之前写入的数据中

library.append('test_frame', df_append)
输出
VersionedItem(symbol=test_frame,library=intro,data=n/a,version=1,metadata=None,host=<host>)
然后查看最后 5 行,看看发生了什么

library.tail('test_frame', 5).data
输出
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-02 04:00:00     17     48     36     43      6     46      5      8  ...
2000-01-02 05:00:00     20     19     24     44     29     32      2     19  ...
2000-01-02 07:00:00      9     15      4     48     48     35     34     49  ...
2000-01-02 08:00:00     35      4     12     30     30     12     38     25  ...
2000-01-02 09:00:00     25     17      3      1      1     15     33     49  ...

最后的 5 行由之前写入的最后两行和我们刚刚追加的 3 条新行组成。

Append 对于向大型时间序列末尾添加新数据非常有用。

更新(Update)

update 基本操作使您能够覆盖一块连续的数据。这将导致修改一些行并删除其他行,正如我们在下面的示例中将看到的。

这里我们创建一个用于 update 的新 DataFrame,只有 2 行,相隔 2 小时

random_data = np.random.randint(0, 50, size=(2, 50))
df = pd.DataFrame(random_data, columns=['COL_%d' % i for i in range(50)])
df.index = pd.date_range(datetime(2000, 1, 1, 5), periods=2, freq="2h")
df
输出(仅显示由 iloc[] 选择的行 0 和 2)
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-01 05:00:00     47     49     15      6     22     48     45     22  ...
2000-01-01 07:00:00     46     10      2     49     24     49      8      0  ...
现在 update 该 symbol
library.update('test_frame', df)
输出(关于 update 的信息)
VersionedItem(symbol=test_frame,library=intro,data=n/a,version=2,metadata=None,host=<host>)

现在让我们看看该 symbol 中的前 4 行

library.head('test_frame', 4).data  # head/tail are similar to the equivalent Pandas operations
输出
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-01 05:00:00     47     49     15      6     22     48     45     22  ...
2000-01-01 07:00:00     46     10      2     49     24     49      8      0  ... 
2000-01-01 08:00:00     38     21      2      1     37      6     31     31  ... 
2000-01-01 09:00:00     45     17     39     47     47     11     33     31  ...

让我们解析一下如何得到这个结果。update 操作

  • 用索引匹配的新数据替换了 symbol 中的数据(在本例中是 05:00 和 07:00 行)
  • 删除了新数据日期范围内但不在新数据索引中的任何行(在本例中是 06:00 行)
  • 保持了数据的其余部分不变(在本例中是 09:00 及之后的行)

从逻辑上讲,这相当于用新数据替换旧数据的完整日期范围,这与您对 update 操作的预期一致。

版本控制(Versioning)

您可能已经注意到,read 调用不直接返回数据 - 而是返回一个 VersionedItem 结构。您可能还注意到修改操作(write, appendupdate)会增加版本号。ArcticDB 会对所有修改进行版本控制,这意味着您可以检索数据的早期版本 - 它是一个双时态数据库

library.tail('test_frame', 7, as_of=0).data
输出
                     COL_0  COL_1  COL_2  COL_3  COL_4  COL_5  COL_6  COL_7  ...
2000-01-01 23:00:00     16     46      3     45     43     14     10     27  ...
2000-01-02 00:00:00     37     37     20      3     49     38     23     46  ...
2000-01-02 01:00:00     42     47     40     27     49     41     11     26  ...
2000-01-02 02:00:00     46     12     38     47      4     31      1     42  ...
2000-01-02 03:00:00     46     20      5     42      8     35     12      2  ...
2000-01-02 04:00:00     17     48     36     43      6     46      5      8  ...
2000-01-02 05:00:00     20     19     24     44     29     32      2     19  ...

注意时间戳 - 我们读取的是 append 操作之前的数据。请注意,您也可以将 datetime 传递给任何 as_of 参数,这将导致读取早于传递的 datetime 的最后一个版本。

版本控制、清除旧版本与快照

默认情况下,write, appendupdate 操作不会删除旧版本。请注意,这将占用更多空间。

此行为可以通过 prune_previous_versions 关键字参数控制。这将节省空间,但旧版本将不再可用。

可以通过使用快照来实现折衷,快照允许保存 library 的状态并在以后读回。这样可以保护某些版本不被删除,它们将在快照被删除时被删除。有关详情,请参见快照文档

存储访问

S3 配置

有两种方法配置 S3 访问。如果您知道访问密钥和秘密密钥,只需按如下方式连接

import arcticdb as adb
ac = adb.Arctic('s3://ENDPOINT:BUCKET?region=blah&access=ABCD&secret=DCBA')

否则,您可以将身份验证委托给 AWS SDK(遵循标准 AWS 配置选项

ac = adb.Arctic('s3://ENDPOINT:BUCKET?aws_auth=true')

同上,但使用 HTTPS

ac = adb.Arctic('s3s://ENDPOINT:BUCKET?aws_auth=true')

S3

如果您的 S3 端点使用 HTTPS,请使用 s3s

连接到已定义的存储端点

使用预定义的访问密钥和存储密钥连接到本地存储(非 AWS - s3.local 的 HTTP 端点)

ac = adb.Arctic('s3://s3.local:arcticdb-test-bucket?access=EFGH&secret=HGFE')
连接到 AWS

使用预定义的区域连接到 AWS

ac = adb.Arctic('s3s://s3.eu-west-2.amazonaws.com:arcticdb-test-bucket?aws_auth=true')

请注意,没有给出明确的凭据参数。当传递 aws_auth 时,身份验证将委托给 AWS SDK,后者负责在 .config 文件或环境变量中查找相应的凭据。您可以通过设置 AWS_PROFILE 环境变量来手动配置使用哪个配置文件,如 AWS 文档中所述。

使用 bucket 中的特定路径

您可能希望将 ArcticDB library 的访问权限限制在 bucket 中的特定路径。为此,您可以使用 path_prefix 参数

ac = adb.Arctic('s3s://s3.eu-west-2.amazonaws.com:arcticdb-test-bucket?path_prefix=test&aws_auth=true')

Azure

ArcticDB 使用 Azure 连接字符串来定义连接

import arcticdb as adb
ac = adb.Arctic('azure://AccountName=ABCD;AccountKey=EFGH;BlobEndpoint=ENDPOINT;Container=CONTAINER')

例如

import arcticdb as adb
ac = adb.Arctic("azure://CA_cert_path=/etc/ssl/certs/ca-certificates.crt;BlobEndpoint=https://arctic.blob.core.windows.net;Container=acblob;SharedAccessSignature=sp=awd&st=2001-01-01T00:00:00Z&se=2002-01-01T00:00:00Z&spr=https&rf=g&sig=awd%3D")

更多信息,请参见 Arctic 类参考

LMDB

LMDB 支持配置其 map size。请参见其文档

您可能需要在 Windows 上进行调整,而在 Linux 上默认值要大得多,应该足够。这是因为 Windows 会急切地为 map 文件分配物理空间,而 Linux 上的 map size 只是将使用的物理空间的上限。

您可以在连接字符串中设置 map size

import arcticdb as adb
ac = adb.Arctic('lmdb://path/to/desired/database?map_size=2GB')

Windows 上的默认值是 2GiB。出现 lmdb errror code -30792 错误表示 map 快满了,您应该增加其大小。如果您正在进行大量写入,就会发生这种情况。

在每个 Python 进程中,应确保对于给定的 LMDB 数据库,只有一个 Arctic 实例打开。

LMDB 不适用于远程文件系统。

内存配置

提供内存后端主要用于测试和实验。在不希望使用 LMDB 创建文件时,它可能很有用。

没有配置参数,内存完全由 Arctic 实例拥有。

例如

import arcticdb as adb
ac = adb.Arctic('mem://')

对于并发访问本地后端,我们建议将 LMDB 连接到 tmpfs,参见LMDB 和内存教程

事务

  • 事务可能非常有用,但通常昂贵且缓慢
  • 如果我们分解 ACID:原子性(Atomicity)、一致性(Consistency)和持久性(Durability)很有用,而隔离性(Isolation)则不那么重要
  • 大多数分析工作流程都可以构建为无需事务即可运行
  • 那么当事务通常不需要时,为何要承担其成本?
  • ArcticDB 没有事务,因为它专为高吞吐量分析工作负载而设计