Amazon Neptuneのトランザクションについて完全に理解した

2021.07.16

あくまで完全に理解したというレベル感です。

Amazon Neptuneのトランザクションについて調査する機会があったのですが、自分がトランザクションに詳しくないこともあり結構大変でした。

ということで、備忘録も兼ねてブログ化しました。

基本はTransaction Semantics in Neptune【公式ドキュメント】に沿った内容になっています。

トランザクションできるの?

まず、Amazon Neptuneでは、Gremlinのようなtx.commit()およびtx.rollback()を使用した手動トランザクションはサポートされていません。

ただ、1つのトラバーサルの開始から終了を1つのトランザクションとして動作させる機能があります。

例えばGremlinの場合ですと、1つのトラバーサルの中で下記の条件を満たす複数のステートメントは1つのトランザクションとして扱うことができます。

  1. 各ステートメントが、セミコロン (;) または改行文字 (\n) で区切りられている
  2. 最後のもの以外の各ステートメントは、next() ステップの実行で終わっている
  • トランザクションとして扱われる例)
g.addV('user').property('name','alice').next();
g.addV('user').property('name','bob');
  • トランザクションとして扱わない例)
g.addV('user').property('name','alice');
g.addV('user').property('name','bob');
g.addV('user').property('name','alice').next()
g.addV('user').property('name','bob');

こちらの内容は公式ドキュメントに記載されています。

トランザクション分離レベルのおさらい

Amazon Neptuneのトランザクションがどのように動作するかを理解する前に、トランザクションの分離レベルのおさらいをしておきます。

毎回調べたり、イマイチ理解しきれていない部分があったので良い学習になりました。

まず、よく聞くDirty ReadFuzzy Read/Non-Repeatable ReadPhantom Readという現象(Phenomena)についておさらいします。

歴史は古く、1992年7月にISO/IEC 9075(標準SQLの公式な名称)のドラフトSQL:1992 Standard にて記述されています。

Dirty Read

  • 原文

1) P1 ("Dirty read"): SQL-transaction T1 modifies a row. SQL-transaction T2 then reads that row before T1 performs a COMMIT. If T1 then performs a ROLLBACK, T2 will have read a row that was never committed and that may thus be considered to have never existed.

  • 直訳

1) P1 ("Dirty read"): SQLトランザクションT1がある行を変更しました。次にT1がCOMMITを実行する前に、T2がその行を読みます。もし、T1がROLLBACKを行った場合、T2はコミットされなかった行を読んだことになり、その行は存在しなかったものとみなされてしまいます。

  • 例)
  1. T1がレコードAをUPDATE (alice -> bob) ※未コミット
  2. T2がレコードAをSELECT (bob)
  3. T1がレコードAをロールバック (bob -> alice)

結果として、T2は存在しない値を読んでしまうという現象(Phenomena)

Fuzzy Read/Non-Repeatable Read

  • 原文

2) P2 ("Non-repeatable read"): SQL-transaction T1 reads a row. SQL-transaction T2 then modifies or deletes that row and performs a COMMIT. If T1 then attempts to reread the row, it may receive the modified value or discover that the row has been deleted.

  • 直訳

P2 ("Non-repeatable read"): SQL トランザクション T1 は、ある行を読み込みます。SQL トランザクション T2 は、その行を修正または削除し、COMMIT を実行します。その後、T1がその行を読み直そうとすると、修正された値を受け取ったり、その行が削除されていることに気づくかもしれません。

  1. T1でレコードAをSELECT (alice)
  2. T2でレコードAをUPDATE (alice -> bob) ※コミット
  3. T1でレコードAを再度SELECT (bob)

結果として、T1内で同じレコードを読みだしたのにタイミングによって異なる結果になってしまう現象(Phenomena)

一見当たり前のように思えますが、ACIDのIsolation(トランザクション処理が他のトランザクション処理の影響を受けない性質)という観点で考えると高いとは言えません。

これを防ぐには、T1で共有ロック(他のトランザクションによる読出はOK)を利用する方法があります。

Phantom Read

  • 原文

3) P3 ("Phantom"): SQL-transaction T1 reads the set of rows N that satisfy some . SQL-transaction T2 then executes SQL-statements that generate one or more rows that satisfy the used by SQL-transaction T1. If SQL-transaction T1 then repeats the initial read with the same , it obtains a different collection of rows.

  • 直訳

P3 ("Phantom"): SQL-トランザクションT1は、ある<検索条件>を満たす行の集合Nを読み込みます。その後、SQL-transaction T2は、SQL-transaction T1が使用した<検索条件>を満たす1つ以上の行を生成するSQL-statementを実行します。その後、SQL-transaction T1が同じ<検索条件>で最初の読み込みを繰り返すと、異なる行の集合を得ることができます。

  1. T1でuserをSELECT ALL ({alice})
  2. T2でuserテーブルにcarolをINSERT ※コミット
  3. T1でuserを再びSELECT ALL ({alice, carol})

結果として、T1内で同じ範囲のレコードを読みだしたのにタイミングによって異なる結果になってしまう現象(Phenomena)

これを防ぐには、T1でテーブル全体に共有ロック(他のトランザクションによる読出はOK)を利用する方法があります。

トランザクション分離レベル4つ

上記のような、複数のTXを同時実行した場合に発生する現象に対応して、DBMSでどの程度の正確性(Correctness)一貫性(Consistency)を持たせるか4段階で定義したものです。

ACID特性のI(Isolation:分離性)に関する概念です。

  • READ UNCOMMITTED: Dirty Read、Fuzzy Read、Phantom Readをすべて許容
  • READ COMMITTED: Dirty Readは防ぐが、Fuzzy ReadとPhantom Readは許容
  • REPEATABLE READ: Dirty ReadとFuzzy Readは防ぐが、Phantom Readは許容
  • SERIALIZABLE: Dirty Read、Fuzzy Read、Phantom Readをすべて防ぐ

まとめると

分離レベル Dirty Read Fuzzy Read Phantom Read
READ UNCOMMITTED
READ COMMITTED ×
REPEATABLE READ × ×
SERIALIZABLE × × ×

まとめるとこんな感じのテーブルになります。 よく見る図ですね。

より深く

と、ここまではよくある話なのですが、

実際には上記のパターンでは不十分ということで、より細かな現象(Phenomena)、異常(Anomaly)、分離レベル(Isolation Level)が提唱されていたりします。

Snapshot Isolationの定義を調べていくと、最終的に以下の論文に辿り着くことが多いようです。

A Critique of ANSI SQL Isolation Levels

ここらへんは専門外のためすべてを理解するのは避けますが、Snapshot Isolationについてはこの後関係してくるため、少し解説します。

Snapshot Isolationは次の性質を持ちます。(Wiki参照)

  • トランザクションは一貫性のあるデータベースのスナップショット(トランザクション開始時に存在していた最後にコミットされた値)を読む

  • トランザクションの更新がスナップショット以降に他のトランザクションがコミットした更新と競合しない場合に限りトランザクションが成功する

実際には、Snapshot IsolationはMVCC(MultiVersion Concurrency Control)で実現されることが多いです。

MVCCとは、データベースが複数のトランザクションを受け取った場合でも、同時並行性と一貫性の両方を保証する仕組みです。

MVCCでは、トランザクションがデータを更新する必要がある場合、新しいデータで上書きするのではなく、新しいバージョンのデータを作成します。

各トランザクションが見ることのできるバージョンは、実装されている分離レベルによって異なります。

分離レベルがSnapshot Isolationの場合は、トランザクションはトランザクションの開始時点のデータを見ることができます。

このため、ロックを必要とせず、互いに分離した処理を行うことができるため、同時並列性が向上します。

ちなみに、MVCCとSnapshot Isolationは混同されがちですが、MVCCの方がもっと広い概念らしいです。

せっかく分離レベルについていろいろ調べたので、Dockerで立ち上げたMySQLで適当なテーブルを作ってDirty Read / Fuzzy Read / Phantom Readの現象を試してみます。

ちなみにデフォルトの設定では、グローバル/セッションともにREPEATABLE READとなっています。

MySQLの設定はこちらが参考になります。

  • Table Schema

  • Default Isolation Level

  • Dirty Read

  • Fuzzy Read

Neptuneでのトランザクション分離レベル

前置きが長くなりましたが、いよいよAmazon Neptuneのトランザクション分離レベルについてです。

Neptuneのトランザクション分離レベルは、クエリがRead-OnlyMutationのどちらに該当するかで異なります。

詳細はTransaction Semantics in Neptuneを確認してください。

例えばGremlinの場合は、クエリの中にaddE(),addV(),drop()のようなデータ操作が含まれる場合はMutaion Query、それ以外はRead-Only Queryという扱いになります。

読み取り専用クエリの分離レベル

Amazon Neptuneの読み取り専用クエリはSnapshot Isolationで動作します。

つまり、Dirty Read/Fuzzy Read/Phantom Readのいずれも発生しません。

Snapshot IsolationはMVCCで実現されます。

このため、読み取り専用クエリはデータをロックすることがなく、ミューテーションクエリをブロックすることもありません。

唯一の考慮事項は、リードレプリカを利用している際に、ライターで行われた更新がリードレプリカに反映されるまで少し時間がかかるケースがあることです。

Snapshot Isolationのため一貫性のある読み取りは保証されるため基本的に問題にはならないのですが、どうしても最新のデータほしいというユースケースにおいてはwriterエンドポイントにクエリを送信するという方法があるそうです。

ミューテーションクエリの分離レベル

ミューテーションクエリの一部として行われるReadはREAD COMMITTEDで実行されるため、Dirty Readは発生しません。

そして、データの読み取り時にレコードやレコードの範囲をロックするため、Fuzzy ReadやPhantom Readといった現象が起こらないという強力な保証を提供してくれています。

Amazon Neptuneがどの範囲をどのようにロックするのか?を知るには、Neptuneのグラフデータモデルインデックスについて正しく理解する必要があります。

Amazon Neptuneのグラフデータモデルは、下記の4つのポジション(position)と呼ばれる要素から構成されています。

  • subject:主語 (S)
  • predicate:述語 (P)
  • object:目的語 (O)
  • graph:グラフ (G)

Amazon Neptuneはこのポジションの組み合わせから成るインデックスを使ってクエリを解決します。

4つのポジションには計16(2^4)のアクセスパターンがありますが、6つのインデックスを利用すればスキャンやフィルタリングを行うことなく16パターンすべてに効率的に問い合わせることができます。

詳しくは公式ドキュメントのNeptune Graph Data Modelに記載されています。

RDFなどの知見がある方は割と簡単に理解できるのかもしれませんが、私には難しかったです。

Deep dive on Amazon Neptuneで解説している箇所が比較的分かりやすかったので抜粋しておきます。

重要な点として、Amazon Neptuneではデフォルトで下記3つのインデックスのみをサポートしています。

  • SPOG: Subject + Predicate + Object + Graph
    • 頂点(Subject)や頂点+プロパティ識別子などなど前方のポジションがバインドされている場合は、効率的なアクセスが可能
  • POGS: Predicate + Object + Graph + Subject
    • Pポジションに保存されているエッジやラベルがバインドされている場合は、効率的なアクセスが可能
  • GPSO: Graph + Predicate + Subject + Object
    • グラフ(またはエッジID)とプロパティ識別子がバインドされている場合は、効率的なアクセスが可能

コンフリクトの解決

Amazon Neptuneは、最初のトランザクションがロックした範囲のレコードを2番目のトランザクションが変更しようとした場合、直ちにコンフリクトを検出し、2番目のトランザクションをブロックします。

ただし、デッドロックが検出されるかどうかで動作が少し変わります。

  • デッドロックが検出されない場合
    • 自動的にlock-wait timeoutを開始し、ロックされたトランザクションはロックを保持しているトランザクションがロックを解放するまで最大60秒間待機する
    • 時間内にロックが解放された場合、2つ目のトランザクションのブロックは解除され、処理が再開される
    • タイムアウトした場合、ブロックされた2つ目のトランザクションはロールバックされる
  • デッドロックを検出した場合
    • lock-wait timeoutを開始することなく、2つ目のトランザクションを直ちにキャンセルし、ロールバックする

Neptune トランザクションセマンティクスの例

こちらを参考にいくつかGremlinのクエリの実行を試したいと思います。

セットアップ

以前書いたブログがあるのでこちらを参考にしてください。

Example 1 – Inserting a Property Only If It Does Not Exist

クレジットカードの信用スコアの更新を想定したクエリです。

複数のトランザクションによる同時更新を避けたい場合にはCoalesce Stepが使えます。

coalesce()は最初に第1引数を実行し、失敗した場合に第2引数を実行します。

  • Gremlin Query
g.V('person1').hasLabel('Person').coalesce(has('creditScore'), property('creditScore', 'AAA+'))
  • Profile API Response
*******************************************************
                Neptune Gremlin Profile
*******************************************************

Query String
==================
g.V("person1").hasLabel("Person").coalesce(has("creditScore"), property("creditScore", "AAA+"))

Original Traversal
==================
[GraphStep(vertex,[person1]), HasStep([~label.eq(Person)]), CoalesceStep([[TraversalFilterStep([PropertiesStep([creditScore],value)])], [AddPropertyStep({value=[AAA+], key=[creditScore]})]])]

Optimized Traversal
===================
Neptune steps:
[
    NeptuneGraphQueryStep(Vertex) {
        JoinGroupNode {
            PatternNode[(?1=<person1>, <~label>, ?2=<Person>, <~>) . project ?1 .], {estimatedCardinality=1, indexTime=0, joinTime=1, numSearches=1}
            CoalesceNode {
                PatternNode[(?1, ?3=<creditScore>, ?4, <~>) . project ask .], {estimatedCardinality=0, indexTime=0, joinTime=0, numSearches=1}
                InsertVertexPropertyNode {vertexId=?1, key=<creditScore>, value="AAA+", cardinality=set . project ?1}
            } bind (?1, ?1) as ?5
        }, annotations={path=[Vertex(?1):GraphStep, Vertex(?5):CoalesceStep], joinStats=true, optimizationTime=1, readForUpdate=true, maxVarId=6, executionTime=2}
    },
    NeptuneTraverserConverterStep
]

Physical Pipeline
=================
NeptuneGraphQueryStep
    |-- StartOp
    |-- JoinGroupOp
        |-- SpoolerOp(1000)
        |-- DynamicJoinOp(PatternNode[(?1=<person1>, <~label>, ?2=<Person>, <~>) . project ?1 .], {estimatedCardinality=1})
        |-- CoalesceOp

Runtime (ms)
============
Query Execution: 1.893

Traversal Metrics
=================
Step                                                               Count  Traversers       Time (ms)    % Dur
-------------------------------------------------------------------------------------------------------------
NeptuneGraphQueryStep(Vertex)                                          1           1           1.609    95.40
NeptuneTraverserConverterStep                                          1           1           0.077     4.60
                                            >TOTAL                     -           -           1.687        -

Predicates
==========
# of predicates: 1

Results
=======
Count: 1
Output: [v[person1]]


Index Operations
================
Query execution:
    # of statement index ops: 3
    # of unique statement index ops: 3
    Duplication ratio: 1.0
    # of terms materialized: 0
    # of statements added: 1
    # of statements removed: 0

Index operations (query execution):
SearchEvent(?1=<person1>, <~label>, ?2=<Person>, <~> . project ?1 .)
AskEvent(?1=<person1>, ?3=<creditScore>, ?4, <~> .)
RangeCountEvent(<person1>, <~label>, ?, ? ., accurate=true)
StatementEvent[Added(<person1> <creditScore> "AAA+" <~>) .]

この例では、トランザクションは最初にperson1creditScoreプロパティを読み取る必要があります。

ミューテーションクエリのロック機能により、トランザクションはこの読み取り処理でSPOGインデックスのS=person1とP=creditScoreの範囲をレンジロックします。

そのため仮に、2つのトランザクション(T1, T2)が同時に実行された場合でも、片方のトランザクションはロック解放待ちになるため、競合する同時更新が発生しなくなります。

Example 2 – Asserting That a Property Value Is Globally Unique

社会保障番号を登録を想定したクエリです。

主キーとして同じIDが存在しないことを保証したいケースにおいてもCoalesce Stepが使えます。

  • Gremlin Query
g.V().has('ssn', 123456789).fold().coalesce(__.unfold(),__.addV('Person').property('name', 'John Doe').property('ssn', 123456789))
  • Profile API Response
*******************************************************
                Neptune Gremlin Profile
*******************************************************

Query String
==================
g.V().has("ssn", 123456789).fold().coalesce(__.unfold(),__.addV("Person").property("name", "John Doe").property("ssn", 123456789))

Original Traversal
==================
[GraphStep(vertex,[]), HasStep([ssn.eq(123456789)]), FoldStep, CoalesceStep([[UnfoldStep], [AddVertexStep({name=[John Doe], label=[Person], ssn=[123456789]})]])]

Optimized Traversal
===================
Neptune steps:
[
    NeptuneGraphQueryStep(Vertex) {
        JoinGroupNode {
            CoalesceNode {
                JoinGroupNode {
                    PatternNode[(?1, <ssn>, ?5, ?) . project distinct ?1 . ContainsFilter(?5 in (123456789^^<INT>, 123456789^^<LONG>, 1.23456792E8^^<FLOAT>, 1.23456789E8^^<DOUBLE>)) .], {estimatedCardinality=0, indexTime=0, joinTime=0, numSearches=1}
                }
                JoinGroupNode {
                    Bind {ValueExpression.SupplierExpr(supplier=RandomVertexIdSupplier) as ?3}
                    InsertVertexNode {vertexId=?3, label=?4=<Person> . project ?3,?4}
                    InsertVertexPropertyNode {vertexId=?3, key=<name>, value="John Doe", cardinality=set . project ?3}
                    InsertVertexPropertyNode {vertexId=?3, key=<ssn>, value=123456789^^<INT>, cardinality=set . project ?3}
                }
            } bind (?1, ?3) as ?3
        }, annotations={path=[Vertex(?3):AddVertexStep], joinStats=true, optimizationTime=1, readForUpdate=true, maxVarId=6, executionTime=1}
    },
    NeptuneTraverserConverterStep
]

Physical Pipeline
=================
NeptuneGraphQueryStep
    |-- StartOp
    |-- JoinGroupOp
        |-- CoalesceOp

Runtime (ms)
============
Query Execution: 1.577

Traversal Metrics
=================
Step                                                               Count  Traversers       Time (ms)    % Dur
-------------------------------------------------------------------------------------------------------------
NeptuneGraphQueryStep(Vertex)                                          1           1           1.369    95.65
NeptuneTraverserConverterStep                                          1           1           0.062     4.35
                                            >TOTAL                     -           -           1.431        -

Predicates
==========
# of predicates: 2

Results
=======
Count: 1
Output: [v[54bd5518-ca2b-20c9-ee11-4763f5e5c1bc]]


Index Operations
================
Query execution:
    # of statement index ops: 1
    # of unique statement index ops: 1
    Duplication ratio: 1.0
    # of terms materialized: 0
    # of statements added: 3
    # of statements removed: 0

Index operations (query execution):
SearchEvent(?1, <ssn>, ?5, ? . project distinct ?1 . filter(?5 in (1.23456792E8^^<FLOAT>, 123456789^^<LONG>, 1.23456789E8^^<DOUBLE>, 123456789^^<INT>)) .)
StatementEvent[Added(<54bd5518-ca2b-20c9-ee11-4763f5e5c1bc> <~label> <Person> <~>) .]
StatementEvent[Added(<54bd5518-ca2b-20c9-ee11-4763f5e5c1bc> <name> "John Doe" <~>) .]
StatementEvent[Added(<54bd5518-ca2b-20c9-ee11-4763f5e5c1bc> <ssn> 123456789^^<INT> <~>) .]

1つ前の例と似ていますが、主な違いはレンジロックがSPOGインデックスではなくPOGSインデックスに対して行われることです。

トランザクションは最初にPとOの位置がバインドされたパターン、?person :ssn 123456789を読み取る必要があります。

仮に、データが存在する場合にはPOGSインデックスのP=ssn、O=123456789にレンジロックがかかり、他のトランザクションによる処理はブロックされます。

一方で、条件に一致するデータが存在する場合は特に何も行われません。

Example 3 – Changing a Property If Another Property Has a Specified Value

ゲーム内でレベルアップした際、ある特定のパラメーターと値を追加するクエリです。

ゲーム内では様々なイベントにおいてレベルが上がるため、レベルアップ時の初期化処理が競合しないようにしたいというケースです。

propertyでsingleを指定すると、新しいプロパティが追加されるか、既存のプロパティの値が更新されるか、のどちらかの動作になります。

  • Gremlin Query
g.V('person1').hasLabel('Person').has('level', 1).property('level2Score', 0).property(single, 'level', 2)
  • Profile API Response
*******************************************************
                Neptune Gremlin Profile
*******************************************************

Query String
==================
g.V("person1").hasLabel("Person").has("level", 1).property("level2Score", 0).property(single, "level", 2)

Original Traversal
==================
[GraphStep(vertex,[person1]), HasStep([~label.eq(Person), level.eq(1)]), AddPropertyStep({value=[0], key=[level2Score]}), AddPropertyStep({value=[2], key=[level]})]

Optimized Traversal
===================
Neptune steps:
[
    NeptuneGraphQueryStep(Vertex) {
        JoinGroupNode {
            PatternNode[(?1=<person1>, <level>, ?3, ?) . project distinct ?1 . ContainsFilter(?3 in (1^^<SBYTE>, 1^^<SHORT>, 1^^<INT>, 1^^<LONG>, 1.0^^<FLOAT>, 1.0^^<DOUBLE>)) .], {estimatedCardinality=0, indexTime=0, joinTime=0, numSearches=1, actualTotalOutput=0}
            PatternNode[(?1, <~label>, ?2=<Person>, <~>) . project ask .], {estimatedCardinality=1}
            InsertVertexPropertyNode {vertexId=?1, key=<level2Score>, value=0^^<INT>, cardinality=set . project ?1}
            InsertVertexPropertyNode {vertexId=?1, key=<level>, value=2^^<INT>, cardinality=single . project ?1}
        }, annotations={path=[Vertex(?1):GraphStep], joinStats=true, optimizationTime=2, readForUpdate=true, maxVarId=4, executionTime=1}
    },
    NeptuneTraverserConverterStep
]

Physical Pipeline
=================
NeptuneGraphQueryStep
    |-- StartOp
    |-- JoinGroupOp
        |-- SpoolerOp(1000)
        |-- DynamicJoinOp(PatternNode[(?1=<person1>, <level>, ?3, ?) . project distinct ?1 . ContainsFilter(?3 in (1^^<SBYTE>, 1^^<SHORT>, 1^^<INT>, 1^^<LONG>, 1.0^^<FLOAT>, 1.0^^<DOUBLE>)) .], {estimatedCardinality=0, indexTime=0, joinTime=0, numSearches=1, actualTotalOutput=0})
        |-- SpoolerOp(1000)
        |-- DynamicJoinOp(PatternNode[(?1, <~label>, ?2=<Person>, <~>) . project ask .], {estimatedCardinality=1})
        |-- BarrierOp()
        |-- GremlinInsertOp
        |-- BarrierOp()
        |-- GremlinInsertOp

Runtime (ms)
============
Query Execution: 0.968

Traversal Metrics
=================
Step                                                               Count  Traversers       Time (ms)    % Dur
-------------------------------------------------------------------------------------------------------------
NeptuneGraphQueryStep(Vertex)                                                                  0.493    97.16
NeptuneTraverserConverterStep                                                                  0.014     2.84
                                            >TOTAL                     -           -           0.507        -

Predicates
==========
# of predicates: 4

Results
=======
Count: 0
Output: []


Index Operations
================
Query execution:
    # of statement index ops: 1
    # of unique statement index ops: 1
    Duplication ratio: 1.0
    # of terms materialized: 0

Index operations (query execution):
SearchEvent(?1=<person1>, <level>, ?3, ? . project distinct ?1 . filter(?3 in (1^^<SBYTE>, 1.0^^<DOUBLE>, 1.0^^<FLOAT>, 1^^<LONG>, 1^^<INT>, 1^^<SHORT>)) .)

この例では、トランザクションは最初にg.V('person1').hasLabel('Person').has('level', 1)でデータを読み込む必要があります。

この際、SPOGインデックスの S=person1、P=level、O=1 の範囲にレンジロックがかかります。

このロックにより、競合するトランザクションによる同時更新が発生しなくなります。

さいごに

TXはとても奥が深く自分の未熟さを痛感しました。

ただ、その分学びも多くとても面白かったです。

中でも下記のサイトは非常に参考になりました。

感謝致します。

参考にさせて頂いたサイト