ArcticDB_demo_resample
在 Github 中查看 | 在 Google Colab 中打开In [ ]
已复制!
!pip install arcticdb
!pip install arcticdb
In [2]
已复制!
import numpy as np
import pandas as pd
import arcticdb as adb
import numpy as np import pandas as pd import arcticdb as adb
ArcticDB 重采样演示¶
本演示 Notebook 展示了 ArcticDB 的高性能重采样能力。
以下是您需要了解的内容
- 它在读取过程中实时运行
- 这使得它在大型数据集上比 Pandas 高效得多
- 用法类似于 Pandas 的 resample 函数
- 您可以对每列应用多个聚合器
- 它可用于对高频数据进行降采样并生成“K线”数据(参见示例 4)
设置¶
In [3]
已复制!
# object store
arctic = adb.Arctic("lmdb://arcticdb_resample")
# 对象存储 arctic = adb.Arctic("lmdb://arcticdb_resample")
In [4]
已复制!
# library
lib = arctic.get_library('resample', create_if_missing=True)
# library lib = arctic.get_library('resample', create_if_missing=True)
创建一些数据¶
- 具有 12,000,000 行和 1 秒索引的时间序列
- 整型、浮点型、字符串型列
- 将数据写入 ArcticDB
In [5]
已复制!
# data for resampling
index = pd.date_range("1990-01-01", periods=12_000_000, freq="s")
int_data = np.arange(len(index), dtype=np.uint64)
float_data = np.round(np.random.uniform(95., 105., len(index)), 3)
letters = ['a','b','c','d','e','f','g']
mkt_data = pd.DataFrame(
index=index,
data={
"id": int_data,
"price": float_data,
"category": (letters*(len(index)//len(letters) + 1))[:len(index)]
}
)
# 用于重采样的数据 index = pd.date_range("1990-01-01", periods=12_000_000, freq="s") int_data = np.arange(len(index), dtype=np.uint64) float_data = np.round(np.random.uniform(95., 105., len(index)), 3) letters = ['a','b','c','d','e','f','g'] mkt_data = pd.DataFrame( index=index, data={ "id": int_data, "price": float_data, "category": (letters*(len(index)//len(letters) + 1))[:len(index)] } )
In [6]
已复制!
# view the first 10 rows of the data
mkt_data.head(10)
# 查看数据的前 10 行 mkt_data.head(10)
Out[6]
id | price | category | |
---|---|---|---|
1990-01-01 00:00:00 | 0 | 95.176 | a |
1990-01-01 00:00:01 | 1 | 97.872 | b |
1990-01-01 00:00:02 | 2 | 104.930 | c |
1990-01-01 00:00:03 | 3 | 103.573 | d |
1990-01-01 00:00:04 | 4 | 97.052 | e |
1990-01-01 00:00:05 | 5 | 103.435 | f |
1990-01-01 00:00:06 | 6 | 99.339 | g |
1990-01-01 00:00:07 | 7 | 103.358 | a |
1990-01-01 00:00:08 | 8 | 104.301 | b |
1990-01-01 00:00:09 | 9 | 104.651 | c |
In [7]
已复制!
# write the data into ArcticDB
sym = 'market_data'
lib.write(sym, mkt_data)
# 将数据写入 ArcticDB sym = 'market_data' lib.write(sym, mkt_data)
Out[7]
VersionedItem(symbol='market_data', library='resample', data=n/a, version=0, metadata=None, host='LMDB(path=~/arcticdb_resample)', timestamp=1718958796318913629)
1. 简单重采样¶
- 重采样到 1 分钟
- 使用不同的聚合器
- 重采样可以被认为是基于时间的 groupby
- 这些组是时间间隔内的所有行
- 也在 Pandas 中运行以比较性能和结果
In [8]
已复制!
# frequency and aggregator params
freq1 = '1min'
aggs1 = {'id': 'max', 'price': 'last', 'category': 'count'}
# 频率和聚合器参数 freq1 = '1min' aggs1 = {'id': 'max', 'price': 'last', 'category': 'count'}
In [9]
已复制!
%%time
# create the resample query and apply it on the read
market_data_1min_df = lib.read(sym, lazy=True).resample(freq1).agg(aggs1).collect().data
print(len(market_data_1min_df))
market_data_1min_df.tail()
%%time # 创建重采样查询并在读取时应用 market_data_1min_df = lib.read(sym, lazy=True).resample(freq1).agg(aggs1).collect().data print(len(market_data_1min_df)) market_data_1min_df.tail()
200000 CPU times: user 684 ms, sys: 251 ms, total: 935 ms Wall time: 171 ms
Out[9]
id | price | category | |
---|---|---|---|
1990-05-19 21:15:00 | 11999759 | 104.106 | 60 |
1990-05-19 21:16:00 | 11999819 | 104.456 | 60 |
1990-05-19 21:17:00 | 11999879 | 95.570 | 60 |
1990-05-19 21:18:00 | 11999939 | 103.967 | 60 |
1990-05-19 21:19:00 | 11999999 | 97.899 | 60 |
id
已复制!
%%time
# read the full data set and resample in Pandas
full_df = lib.read(sym).data
market_data_1min_pd_df = full_df.resample(freq1).agg(aggs1)
print(len(market_data_1min_pd_df))
market_data_1min_pd_df.tail()
%%time # 读取完整数据集并在 Pandas 中重采样 full_df = lib.read(sym).data market_data_1min_pd_df = full_df.resample(freq1).agg(aggs1) print(len(market_data_1min_pd_df)) market_data_1min_pd_df.tail()
200000 CPU times: user 1.6 s, sys: 401 ms, total: 2 s Wall time: 1.15 s
Out[10]
id | price | category | |
---|---|---|---|
1990-05-19 21:15:00 | 11999759 | 104.106 | 60 |
1990-05-19 21:16:00 | 11999819 | 104.456 | 60 |
1990-05-19 21:17:00 | 11999879 | 95.570 | 60 |
1990-05-19 21:18:00 | 11999939 | 103.967 | 60 |
1990-05-19 21:19:00 | 11999999 | 97.899 | 60 |
2. 每列多个聚合器¶
- 类似于 Pandas 中的 NamedAgg
- 降采样到 5 分钟频率
- 对 price 列应用 max 和 last 两个聚合器
- 对于多个聚合器,语法是
output_column_name: (input_column_name: aggregator)
In [11]
已复制!
freq2 = '5min'
aggs2 = {'id': 'max', 'price_last': ('price' ,'last'), 'price_count': ('price' ,'count'), 'category': 'first'}
freq2 = '5min' aggs2 = {'id': 'max', 'price_last': ('price' ,'last'), 'price_count': ('price' ,'count'), 'category': 'first'}
In [12]
已复制!
%%time
lib.read(sym, lazy=True).resample(freq2).agg(aggs2).collect().data
%%time lib.read(sym, lazy=True).resample(freq2).agg(aggs2).collect().data
CPU times: user 1.07 s, sys: 415 ms, total: 1.49 s Wall time: 151 ms
Out[12]
id | category | price_count | price_last | |
---|---|---|---|---|
1990-01-01 00:00:00 | 299 | a | 300 | 102.172 |
1990-01-01 00:05:00 | 599 | g | 300 | 101.450 |
1990-01-01 00:10:00 | 899 | f | 300 | 96.718 |
1990-01-01 00:15:00 | 1199 | e | 300 | 96.345 |
1990-01-01 00:20:00 | 1499 | d | 300 | 98.955 |
... | ... | ... | ... | ... |
1990-05-19 20:55:00 | 11998799 | d | 300 | 100.277 |
1990-05-19 21:00:00 | 11999099 | c | 300 | 103.596 |
1990-05-19 21:05:00 | 11999399 | b | 300 | 96.182 |
1990-05-19 21:10:00 | 11999699 | a | 300 | 99.911 |
1990-05-19 21:15:00 | 11999999 | g | 300 | 97.899 |
40000 行 × 4 列
3. 处理管道:链式操作¶
- 降采样到 2.5 分钟频率
- 按 category 列对重采样数据进行分组
- 使用 mean 聚合 category 分组
In [14]
已复制!
%%time
lib.read(sym, lazy=True).resample('2min30s').agg({'id': 'min', 'category': 'first'}).groupby('category').agg({'id': 'mean'}.collect().data
%%time lib.read(sym, lazy=True).resample('2min30s').agg({'id': 'min', 'category': 'first'}).groupby('category').agg({'id': 'mean'}.collect().data
CPU times: user 1.12 s, sys: 309 ms, total: 1.43 s Wall time: 183 ms
Out[14]
id | |
---|---|
category | |
a | 5999700.0 |
b | 5999925.0 |
d | 5999850.0 |
e | 6000075.0 |
f | 5999775.0 |
g | 6000000.0 |
c | 6000150.0 |
4. 示例:OHLC(开盘价、最高价、最低价、收盘价)K线¶
- 降采样到 5 分钟频率
- 对 price 列使用多个聚合器
- 这是一个如何将 tick 数据转换为 OHLC K线数据的简单示例
In [15]
已复制!
freq_ohlc = '5min'
agg_ohlc = {
'open': ('price', 'first'),
'high': ('price', 'max'),
'low': ('price', 'min'),
'close': ('price', 'last')
}
freq_ohlc = '5min' agg_ohlc = { 'open': ('price', 'first'), 'high': ('price', 'max'), 'low': ('price', 'min'), 'close': ('price', 'last') }
In [16]
已复制!
%%time
ohlc_5min_bars = lib.read(sym, lazy=True).resample(freq_ohlc).agg(agg_ohlc).collect().data
%%time ohlc_5min_bars = lib.read(sym, lazy=True).resample(freq_ohlc).agg(agg_ohlc).collect().data
CPU times: user 1.26 s, sys: 492 ms, total: 1.75 s Wall time: 118 ms
In [17]
已复制!
ohlc_5min_bars.head()
ohlc_5min_bars.head()
Out[17]
close | low | high | open | |
---|---|---|---|---|
1990-01-01 00:00:00 | 102.172 | 95.076 | 104.992 | 95.176 |
1990-01-01 00:05:00 | 101.450 | 95.008 | 104.999 | 98.520 |
1990-01-01 00:10:00 | 96.718 | 95.053 | 104.990 | 103.959 |
1990-01-01 00:15:00 | 96.345 | 95.070 | 104.969 | 95.878 |
1990-01-01 00:20:00 | 98.955 | 95.011 | 104.983 | 103.538 |
结论¶
我们演示了 ArcticDB 重采样功能的以下特性
- 易于使用,特别是如果您已经在 Pandas 中使用重采样功能
- 性能非常高 - 特别是比读取所有数据然后在 Pandas 中重采样快得多
- 可以与其他查询函数结合使用以构建处理管道
- 可用于生成时间序列 K线