ArcticDB_aws_public_blockchain
在 Github 中查看 | 在 Google Colab 中打开将 AWS 比特币区块链数据加载到 ArcticDB,使用 AWS 作为存储

在本演示中,我们将说明如何将 AWS 与 ArcticDB 结合使用。我们将¶
- 设置 AWS 访问
- 使用 AWS 作为存储初始化 ArcticDB
- 从 AWS 公共数据集读取一部分比特币区块链数据
- 将数据存储在 ArcticDB 中
- 回读数据
- 对数据进行简单分析
注意:此设置用于在 Google Colab 上运行。在其他环境中运行时,需要进行一些简单的更改以移除 Google Drive 代码
安装 ArcticDB 和 S3 库¶
输入 [ ]
已复制!
# s3fs is used by pandas.read_parquet('s3://...')
%pip install arcticdb boto3 tqdm s3fs fastparquet
# s3fs is used by pandas.read_parquet('s3://...') %pip install arcticdb boto3 tqdm s3fs fastparquet
导入¶
输入 [2]
已复制!
import os
from uuid import uuid4
from datetime import timedelta, datetime
from tqdm import tqdm
import boto3
import numpy as np
import pandas as pd
from botocore import UNSIGNED
from botocore.client import Config
import arcticdb as adb
from google.colab import drive, userdata
import os from uuid import uuid4 from datetime import timedelta, datetime from tqdm import tqdm import boto3 import numpy as np import pandas as pd from botocore import UNSIGNED from botocore.client import Config import arcticdb as adb from google.colab import drive, userdata
读取或创建 AWS 配置¶
输入 [3]
已复制!
# mount Google Drive for the config file to live on
drive.mount('/content/drive')
path = '/content/drive/MyDrive/config/awscli.ini'
os.environ['AWS_SHARED_CREDENTIALS_FILE'] = path
# mount Google Drive for the config file to live on drive.mount('/content/drive') path = '/content/drive/MyDrive/config/awscli.ini' os.environ['AWS_SHARED_CREDENTIALS_FILE'] = path
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
输入 [4]
已复制!
check = boto3.session.Session()
no_config = check.get_credentials() is None or check.region_name is None
if no_config:
print('*'*40)
print('Setup your AWS S3 credentials and region before continuing.')
print('https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html')
print('*'*40)
check = boto3.session.Session() no_config = check.get_credentials() is None or check.region_name is None if no_config: print('*'*40) print('在继续之前设置您的 AWS S3 凭据和区域。') print('https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html') print('*'*40)
创建配置文件¶
- 您只需要运行此部分一次
- 在下方输入您的 AWS 详细信息,并将
write_aws_config_file
更改为 True - 后续运行可以使用您保存在 Drive 中的配置文件
输入 [5]
已复制!
aws_access_key = "my_access_key"
aws_secret_access_key = "my_secret_access_key"
region = "my_region"
config_text = f"""
[default]
aws_access_key_id = {aws_access_key}
aws_secret_access_key = {aws_secret_access_key}
region = {region}
"""
write_aws_config_file = False
if write_aws_config_file:
with open(path, 'w') as f:
f.write(text)
aws_access_key = "my_access_key" aws_secret_access_key = "my_secret_access_key" region = "my_region" config_text = f""" [default] aws_access_key_id = {aws_access_key} aws_secret_access_key = {aws_secret_access_key} region = {region} """ write_aws_config_file = False if write_aws_config_file: with open(path, 'w') as f: f.write(text)
设置 AWS S3 存储桶¶
- 首先检查是否有可用的现有存储桶
- 如果没有合适的现有存储桶,则设置新的存储桶
输入 [6]
已复制!
s3 = boto3.resource('s3')
region = boto3.session.Session().region_name
bucket = [b for b in s3.buckets.all() if b.name.startswith('arcticdb-data-')]
if bucket:
bucket_name = bucket[0].name
print('Bucket found:', bucket_name)
else:
bucket_name = f'arcticdb-data-{uuid4()}'
s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint':region})
print('Bucket created:', bucket_name)
s3 = boto3.resource('s3') region = boto3.session.Session().region_name bucket = [b for b in s3.buckets.all() if b.name.startswith('arcticdb-data-')] if bucket: bucket_name = bucket[0].name print('找到存储桶:', bucket_name) else: bucket_name = f'arcticdb-data-{uuid4()}' s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint':region}) print('已创建存储桶:', bucket_name)
Bucket found: arcticdb-data-bda6914b-2715-4acd-8b52-fa593af295bd
初始化 ArcticDB¶
输入 [7]
已复制!
# create an arcticdb instance in the bucket
arctic = adb.Arctic(f's3://s3.{region}.amazonaws.com:{bucket_name}?aws_auth=true')
if 'btc' not in arctic.list_libraries():
# library does not already exist
arctic.create_library('btc', library_options=adb.LibraryOptions(dynamic_schema=True))
library = arctic.get_library('btc')
library
# 在存储桶中创建一个 arcticdb 实例 arctic = adb.Arctic(f's3://s3.{region}.amazonaws.com:{bucket_name}?aws_auth=true') if 'btc' not in arctic.list_libraries(): # 库尚不存在 arctic.create_library('btc', library_options=adb.LibraryOptions(dynamic_schema=True)) library = arctic.get_library('btc') library
输出[7]
Library(Arctic(config=S3(endpoint=s3.eu-north-1.amazonaws.com, bucket=arcticdb-data-bda6914b-2715-4acd-8b52-fa593af295bd)), path=btc, storage=s3_storage)
标记 2023 年 6 月的 BTC 区块链数据用于处理¶
输入 [8]
已复制!
# create the list of all btc blockchain files
bucket = s3.Bucket('aws-public-blockchain')
objects = bucket.objects.filter(Prefix='v1.0/btc/transactions/')
files = pd.DataFrame({'path': [obj.key for obj in objects]})
# 创建所有 btc 区块链文件的列表 bucket = s3.Bucket('aws-public-blockchain') objects = bucket.objects.filter(Prefix='v1.0/btc/transactions/') files = pd.DataFrame({'path': [obj.key for obj in objects]})
输入 [9]
已复制!
# filter only the 2023-06 files to keep run time manageable
files_mask = files['path'].str.contains('2023-06')
to_load = files[files_mask]['path']
print(f"Identified {len(to_load)} / {len(files)} files for processing")
# 仅过滤 2023-06 的文件以保持运行时间可控 files_mask = files['path'].str.contains('2023-06') to_load = files[files_mask]['path'] print(f"已识别 {len(to_load)} / {len(files)} 个文件待处理")
Identified 30 / 5506 files for processing
从 AWS 公共数据集读取 Parquet 文件格式的数据¶
这需要一些时间运行,大约 5 到 10 分钟
输入 [10]
已复制!
%%time
df_list = []
for path in tqdm(to_load):
one_day_df = pd.read_parquet('s3://aws-public-blockchain/'+path,
storage_options={"anon": True},
engine='fastparquet')
# fixup types from source data
one_day_df['hash'] = one_day_df['hash'].astype(str)
one_day_df['block_hash'] = one_day_df['block_hash'].astype(str)
one_day_df['outputs'] = one_day_df['outputs'].astype(str)
one_day_df['date'] = pd.to_datetime(one_day_df['date'], unit='ns')
if 'inputs' in one_day_df.columns:
one_day_df['inputs'] = one_day_df['inputs'].astype(str)
# index on timestamp
one_day_df.set_index('block_timestamp', inplace=True)
one_day_df.sort_index(inplace=True)
df_list.append(one_day_df)
df_aws = pd.concat(df_list).sort_index()
print(f"Read and assembled {len(df_aws)} transaction records from AWS")
# release the list to enable garbage collection
df_list = None
%%time df_list = [] for path in tqdm(to_load): one_day_df = pd.read_parquet('s3://aws-public-blockchain/'+path, storage_options={"anon": True}, engine='fastparquet') # 修复源数据中的类型 one_day_df['hash'] = one_day_df['hash'].astype(str) one_day_df['block_hash'] = one_day_df['block_hash'].astype(str) one_day_df['outputs'] = one_day_df['outputs'].astype(str) one_day_df['date'] = pd.to_datetime(one_day_df['date'], unit='ns') if 'inputs' in one_day_df.columns: one_day_df['inputs'] = one_day_df['inputs'].astype(str) # 按时间戳索引 one_day_df.set_index('block_timestamp', inplace=True) one_day_df.sort_index(inplace=True) df_list.append(one_day_df) df_aws = pd.concat(df_list).sort_index() print(f"已从 AWS 读取并组装 {len(df_aws)} 条交易记录") # 释放列表以启用垃圾回收 df_list = None
100%|██████████| 30/30 [02:50<00:00, 5.67s/it]
Read and assembled 12147125 transaction records from AWS CPU times: user 31.8 s, sys: 9.58 s, total: 41.4 s Wall time: 2min 59s
将数据写入 ArcticDB¶
输入 [11]
已复制!
%%time
library.write('transactions', df_aws)
%%time library.write('transactions', df_aws)
CPU times: user 27.2 s, sys: 4.47 s, total: 31.6 s Wall time: 45.3 s
输出[11]
VersionedItem(symbol='transactions', library='btc', data=n/a, version=1487, metadata=None, host='S3(endpoint=s3.eu-north-1.amazonaws.com, bucket=arcticdb-data-bda6914b-2715-4acd-8b52-fa593af295bd)')
从 ArcticDB 读取数据¶
此读取还应用了日期范围过滤器,以获取从 6 月 3 日起的 25 天数据
输入 [12]
已复制!
%%time
plot_start = datetime(2023, 6, 3, 0, 0)
plot_end = plot_start + timedelta(days=25)
df = library.read('transactions', date_range=(plot_start, plot_end)).data
print(len(df))
%%time plot_start = datetime(2023, 6, 3, 0, 0) plot_end = plot_start + timedelta(days=25) df = library.read('transactions', date_range=(plot_start, plot_end)).data print(len(df))
10097090 CPU times: user 4.31 s, sys: 3.19 s, total: 7.51 s Wall time: 21.1 s
绘制每日交易费用图表¶
输入 [13]
已复制!
fees_per_day = df.groupby(pd.Grouper(freq='1D')).sum(numeric_only=True)
t = f"BTC Blockchain: Total fees per pay from {plot_start} to {plot_end}"
ax = fees_per_day.plot(kind='bar', y='fee', color='red', figsize=(14, 6), title=t)
ax.figure.autofmt_xdate(rotation=60)
fees_per_day = df.groupby(pd.Grouper(freq='1D')).sum(numeric_only=True) t = f"BTC 区块链:从 {plot_start} 到 {plot_end} 的每日总费用" ax = fees_per_day.plot(kind='bar', y='fee', color='red', figsize=(14, 6), title=t) ax.figure.autofmt_xdate(rotation=60)
结论¶
- 我们提供了使用 ArcticDB 与 AWS 进行存储的简单方法
- 我们已证明 ArcticDB 比 Parquet 文件快得多
- 我们展示了如何从一个时间序列数据符号中读取一部分日期数据
- 如果我们保存更大一部分区块链数据,它仍然可以存储在一个符号中,并能高效地读取子集块
- 欢迎尝试使用此笔记本读取更大规模的数据集
输入 [13]
已复制!