跳到内容

并行写入

正如所述,ArcticDB 从根本上不支持对单个符号进行并发写入 - *除非数据以暂存数据的形式并发写入*!

暂存数据不可读,并且需要一个流程在所有暂存数据可读之前将其最终确定。 每个暂存数据单元不得与任何其他暂存数据单元重叠 - 因此 暂存数据必须是时间序列索引的。以下代码使用 Spark 并行地向单个符号并发写入,然后在最终确定数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pyspark
import arcticdb as adb

# This example assumes the below variables (host, bucket, access, secret) are validly set
ac = adb.Arctic(f"s3://{HOST}:{BUCKET}?access={ACCESS}&secret={SECRET})

def _load(work):
    # This method is run in parallel via Spark.
    host, bucket, access, secret, symbol, library, file_path = work
    ac = adb.Arctic(f"s3://{host}:{bucket}?access={access}&secret={secret}")

    library = ac[library]

    df = pd.read_csv(file_path)
    df = df.set_index(df.columns[0])
    df.index = df.index.to_datetime()

    # When staged, the written data is not available to read until finalized.
    library.write(symbol, df, staged=True)

symbol = "my_data"
library = "my_library"

conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext(conf=conf)

# Assumes there are a set of CSV files in the current directory to load from
data = [(host, bucket, access, secret, symbol, library, f) for f in glob.glob("*.csv")]
dist_data = sc.parallelize(data)

if library not in ac.list_libraries():
    ac.create_library(library)

library = ac[library]

ret = dist_data.map(_load)
ret.collect()

library.finalize_staged_data(symbol)

data = library.read(symbol)