并行写入
正如所述,ArcticDB 从根本上不支持对单个符号进行并发写入 - *除非数据以暂存数据的形式并发写入*!
暂存数据不可读,并且需要一个流程在所有暂存数据可读之前将其最终确定。 每个暂存数据单元不得与任何其他暂存数据单元重叠 - 因此 暂存数据必须是时间序列索引的。以下代码使用 Spark 并行地向单个符号并发写入,然后在最终确定数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 | import pyspark
import arcticdb as adb
# This example assumes the below variables (host, bucket, access, secret) are validly set
ac = adb.Arctic(f"s3://{HOST}:{BUCKET}?access={ACCESS}&secret={SECRET})
def _load(work):
# This method is run in parallel via Spark.
host, bucket, access, secret, symbol, library, file_path = work
ac = adb.Arctic(f"s3://{host}:{bucket}?access={access}&secret={secret}")
library = ac[library]
df = pd.read_csv(file_path)
df = df.set_index(df.columns[0])
df.index = df.index.to_datetime()
# When staged, the written data is not available to read until finalized.
library.write(symbol, df, staged=True)
symbol = "my_data"
library = "my_library"
conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext(conf=conf)
# Assumes there are a set of CSV files in the current directory to load from
data = [(host, bucket, access, secret, symbol, library, f) for f in glob.glob("*.csv")]
dist_data = sc.parallelize(data)
if library not in ac.list_libraries():
ac.create_library(library)
library = ac[library]
ret = dist_data.map(_load)
ret.collect()
library.finalize_staged_data(symbol)
data = library.read(symbol)
|