ArcticDB_billion_row_challenge
在 Github 查看 | 在 Google Colab 中打开ArcticDB 十亿行挑战笔记本¶
设置¶
- 安装
- 导入
- 创建 ArcticDB 对象存储
- 定义问题参数
- 创建一个 ArcticDB 库用于保存数据
In [ ]
Copied!
!pip install arcticdb
!pip install arcticdb
In [ ]
Copied!
from arcticdb.config import set_config_int
set_config_int('VersionStore.NumCPUThreads', 16)
from arcticdb.config import set_config_int set_config_int('VersionStore.NumCPUThreads', 16)
In [ ]
Copied!
import pandas as pd
import numpy as np
import arcticdb as adb
import pandas as pd import numpy as np import arcticdb as adb
In [3]
Copied!
arctic = adb.Arctic("lmdb://arcticdb_brc")
arctic = adb.Arctic("lmdb://arcticdb_brc")
In [ ]
Copied!
sym_1brc = 'weather_stations_1brc'
num_cities = 10_000
num_rows = 1_000_000_000
aggs = {
'max': ('Temperature', 'max'),
'min': ('Temperature', 'min'),
'mean': ('Temperature', 'mean')
}
num_blocks = 16
block_size = num_rows // num_blocks
seed = 17
cities = np.array([f"city_{i:04d}" for i in range(num_cities)])
print(f"block_size: {block_size:,d}, total records: {block_size*num_blocks:,d}")
sym_1brc = 'weather_stations_1brc' num_cities = 10_000 num_rows = 1_000_000_000 aggs = { 'max': ('Temperature', 'max'), 'min': ('Temperature', 'min'), 'mean': ('Temperature', 'mean') } num_blocks = 16 block_size = num_rows // num_blocks seed = 17 cities = np.array([f"city_{i:04d}" for i in range(num_cities)]) print(f"block_size: {block_size:,d}, total records: {block_size*num_blocks:,d}")
In [17]
Copied!
lib_name = 'arcticdb_brc'
# delete the library if it already exists
arctic.delete_library(lib_name)
# performance tuning: a large rows_per_segment value can improve performance for dataframes with a large number of rows
lib_options = adb.LibraryOptions(rows_per_segment=10_000_000)
lib = arctic.get_library(lib_name, create_if_missing=True, library_options=lib_options)
lib_name = 'arcticdb_brc' # 如果库已存在则删除它 arctic.delete_library(lib_name) # 性能调优:较大的 rows_per_segment 值可以提高行数较多数据框的性能 lib_options = adb.LibraryOptions(rows_per_segment=10_000_000) lib = arctic.get_library(lib_name, create_if_missing=True, library_options=lib_options)
将数据写入 ArcticDB¶
- 生成数据:每行包含从列表中随机选择的一个城市,以及介于 -99.9 和 99.9 之间的随机温度
- 数据分块写入以控制内存使用
In [18]
Copied!
def create_block_df(rng, cities, block_size):
random_cities = rng.choice(cities, size=block_size)
random_temperatures = np.round(rng.uniform(-99.9, 99.9, size=block_size), 4)
return pd.DataFrame({'City': random_cities, 'Temperature': random_temperatures})
def create_block_df(rng, cities, block_size): random_cities = rng.choice(cities, size=block_size) random_temperatures = np.round(rng.uniform(-99.9, 99.9, size=block_size), 4) return pd.DataFrame({'City': random_cities, 'Temperature': random_temperatures})
In [ ]
Copied!
rng = np.random.default_rng(seed)
print('Writing blocks: ', end='')
for b in range(num_blocks):
block_df = create_block_df(rng, cities, block_size)
if b==0:
lib.write(sym_1brc, block_df)
else:
lib.append(sym_1brc, block_df, validate_index=False)
print(f'{b}, ', end='')
print(' Finished')
rng = np.random.default_rng(seed) print('正在写入块: ', end='') for b in range(num_blocks): block_df = create_block_df(rng, cities, block_size) if b==0: lib.write(sym_1brc, block_df) else: lib.append(sym_1brc, block_df, validate_index=False) print(f'{b}, ', end='') print(' 完成')
读取并聚合数据¶
- 利用 ArcticDB 中的 DataFrame 处理操作来对数据进行分组和聚合
- 这使得高性能的多线程 C++ 层能够完成繁重的工作
- 这段代码在免费的 Google Colab 上运行会占用过多内存。下面的分块版本可以正常工作
In [20]
Copied!
%%timeit
# this runs the query several times to get an accurate timing
lazy_df = lib.read(sym_1brc, lazy=True)
lazy_df.groupby('City').agg(aggs)
lazy_df.collect()
%%timeit # 这会多次运行查询以获得准确的计时结果 lazy_df = lib.read(sym_1brc, lazy=True) lazy_df.groupby('City').agg(aggs) lazy_df.collect()
7.01 s ± 604 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [22]
Copied!
# run the query once more to see the output
lib.read(sym_1brc, lazy=True).groupby('City').agg(aggs).collect().data.sort_index().round(1)
# 再次运行查询以查看输出 lib.read(sym_1brc, lazy=True).groupby('City').agg(aggs).collect().data.sort_index().round(1)
Out[22]
min | mean | max | |
---|---|---|---|
City | |||
city_0000 | -99.9 | 0.0 | 99.8 |
city_0001 | -99.9 | 0.2 | 99.9 |
city_0002 | -99.7 | 0.2 | 99.9 |
city_0003 | -99.9 | 0.1 | 99.9 |
city_0004 | -99.8 | -0.2 | 99.9 |
... | ... | ... | ... |
city_9995 | -99.9 | -0.4 | 99.9 |
city_9996 | -99.9 | 0.1 | 99.9 |
city_9997 | -99.9 | 0.7 | 99.9 |
city_9998 | -99.9 | -1.3 | 99.9 |
city_9999 | -99.8 | -0.2 | 99.9 |
10000 行 × 3 列
结论¶
- 使用 ArcticDB 很容易解决十亿行挑战
- 代码简短易读
- 几乎无需调优即可获得良好性能
附加:分块读取和聚合¶
- 此版本分块读取和聚合数据
- 它与上面的简单版本具有相同的结果
- 它的性能几乎一样好,且需要的内存更少
- 特别是,它可以在免费版 Google Colab 的内存限制内运行
In [23]
Copied!
# we need to aggregate sum and count to get the aggregated mean
aggs_chunked = {
'max': ('Temperature', 'max'),
'min': ('Temperature', 'min'),
'sum': ('Temperature', 'sum'),
'count': ('Temperature', 'count')
}
# define a list of ReadRequests - the chunks are based on row_ranges
read_requests = [adb.ReadRequest(symbol=sym_1brc,
row_range=(block_size*b, block_size*(b+1)))
for b in range(num_blocks)
]
# 我们需要聚合 sum 和 count 来获得聚合后的 mean aggs_chunked = { 'max': ('Temperature', 'max'), 'min': ('Temperature', 'min'), 'sum': ('Temperature', 'sum'), 'count': ('Temperature', 'count') } # 定义 ReadRequests 列表 - 块基于 row_ranges read_requests = [adb.ReadRequest(symbol=sym_1brc, row_range=(block_size*b, block_size*(b+1))) for b in range(num_blocks) ]
In [24]
Copied!
# these functions merge the results of the chunks into one result
def merge_results_pair(r0, r1):
join_r = r0.join(r1, lsuffix='_0', rsuffix='_1')
return pd.DataFrame(index=join_r.index,
data={
'min': join_r[['min_0', 'min_1']].min(axis=1),
'max': join_r[['max_0', 'max_1']].max(axis=1),
'count': join_r[['count_0', 'count_1']].sum(axis=1),
'sum': join_r[['sum_0', 'sum_1']].sum(axis=1),
}
)
def merge_results(r):
res = r[0].data.sort_index()
for b in range(1, len(r)):
next_res = r[b].data.sort_index()
res = merge_results_pair(res, next_res)
res['mean'] = res['sum'] / res['count']
res = res.drop(columns=['sum', 'count']).loc[:, ['min', 'mean', 'max']].round(1)
return res
# 这些函数将各个分块的结果合并成一个结果 def merge_results_pair(r0, r1): join_r = r0.join(r1, lsuffix='_0', rsuffix='_1') return pd.DataFrame(index=join_r.index, data={ 'min': join_r[['min_0', 'min_1']].min(axis=1), 'max': join_r[['max_0', 'max_1']].max(axis=1), 'count': join_r[['count_0', 'count_1']].sum(axis=1), 'sum': join_r[['sum_0', 'sum_1']].sum(axis=1), } ) def merge_results(r): res = r[0].data.sort_index() for b in range(1, len(r)): next_res = r[b].data.sort_index() res = merge_results_pair(res, next_res) res['mean'] = res['sum'] / res['count'] res = res.drop(columns=['sum', 'count']).loc[:, ['min', 'mean', 'max']].round(1) return res
In [25]
Copied!
lazy_df_collection = lib.read_batch(read_requests, lazy=True)
# Apply the same processing to each chunk
lazy_df_collection.groupby('City').agg(aggs_chunked)
read_results = lazy_df_collection.collect()
results = merge_results(read_results)
results
lazy_df_collection = lib.read_batch(read_requests, lazy=True) # 对每个分块应用相同的处理 lazy_df_collection.groupby('City').agg(aggs_chunked) read_results = lazy_df_collection.collect() results = merge_results(read_results) results
Out[25]
min | mean | max | |
---|---|---|---|
City | |||
city_0000 | -99.9 | 0.0 | 99.8 |
city_0001 | -99.9 | 0.2 | 99.9 |
city_0002 | -99.7 | 0.2 | 99.9 |
city_0003 | -99.9 | 0.1 | 99.9 |
city_0004 | -99.8 | -0.2 | 99.9 |
... | ... | ... | ... |
city_9995 | -99.9 | -0.4 | 99.9 |
city_9996 | -99.9 | 0.1 | 99.9 |
city_9997 | -99.9 | 0.7 | 99.9 |
city_9998 | -99.9 | -1.3 | 99.9 |
city_9999 | -99.8 | -0.2 | 99.9 |
10000 行 × 3 列