🪐

Neptune GraphDB における ETL 実装

2023/11/30に公開

こんにちは。インフラチームの山岸 (@yamagishihrd) です。
本記事は SimpleForm Advent Calendar 2023 の1日目です。

皆さん、グラフデータベースは利用されているでしょうか?
当社でも最近 Amazon Neptune という AWS マネージドのグラフデータベースを導入しましたので、今回は Neptune における ETL の実装方法についてご紹介したいと思います。

概要

  • シンプルフォームでは、グラフ DB を用いた機能を提供しています。グラフ DB の基盤として Amazon Neptune を使用しています。
  • Neptune には、データをバッチで投入するためのエンドポイント(= Bulk Loader エンドポイント)が提供されています。本記事の中で仕様や利用方法について解説します。
  • また、Neptune Bulk Loader エンドポイントを利用した ETL 実装についても紹介します。

前提知識

本題に入る前に、本記事で前提となる知識について簡単に述べたいと思います。

グラフデータベースについて

グラフデータベースとは、データをグラフの形で格納し、管理するためのデータベースです。(ここでいう "グラフ" とは、数学的グラフ理論の文脈における用語です)

グラフの基本要素

グラフデータベースが格納するデータの種類として以下のようなものがあります。

  • ノード(頂点) ... 人や場所、物といった、エンティティを表現します。
  • エッジ(辺) ... 友達関係、所有関係、階層関係といった、ノード間の関係性を表現します。
  • プロパティ(属性) ... 名前やタイプ、重みといった、ノードやエッジに対して付与する属性を表現します。(プロパティを持つグラフのことを「プロパティグラフ」と呼びます)

グラフデータの表現方法

主なグラフデータの表現方法として「プロパティグラフ」と「RDF」があります。

  • プロパティグラフ ... ノードとエッジの両方にプロパティを Key-Value 形式で持たせることが可能です。また、ノードとエッジにはラベルを付与することができ、これにより異なるタイプのエンティティや関係性を区別できます。より直感的で理解しやすく、柔軟なデータモデリングが可能です。
  • RDF (Resource Description Framework) ... リソース間の関係を記述するための W3C 標準のデータモデルです。"主語-述語-目的語" からなる「トリプル」という概念を使用してリソース間の関係性を表現します。リソースには URI (Uniform Resource Identifier) という一意な識別子が付与されます。

商業的なグラフデータベースとしてはプロパティグラフが利用されることが多いものの、相互運用性や標準化が重要な場面においては RDF が利用されます。

ユースケース

グラフデータは多対多の関係性を表現するのに向いており、ソーシャルグラフやレコメンデーションなど様々な分野で用いられます。

RDB でもこのようなデータを表現できないことはありませんが、多対多を直接表現することはできず、関連テーブルを間に挟む必要があります。これらのテーブルを結合してエンティティ間の関連性を調べようとすると計算コストが高くなり、パフォーマンス上の問題が生じやすいです。このような場面において、グラフデータベースはパフォーマンスに優れ、より直感的なクエリでデータを操作することができます。

(出典 : 【AWS Black Belt Online Seminar】Amazon Neptune - YouTube)

Amazon Neptune について

Amazon Neptune は AWS が提供するグラフデータベースのマネージドサービスです。以下のような特徴を持ちます。

  • OLTP 処理に最適化されており、毎秒 100,000 以上の高速なクエリ実行をサポートするように設計されています。
  • クラスターあたり最大 15 のリードレプリカを持つことができ、最大 128 TiB までストレージを拡張できます。
  • 索引(インデックス)の管理も Neptune 側で行われ、クエリ実行計画やプロファイルの取得が可能になっています。

以下のように様々な機能が提供されています。

  • Neptune Serverless ... Aurora や Redshift 同様、Serverless のオプションが存在します。ワークロード需要が変動しやすく、弾力性が重視されるような場合に有効な選択肢になります。(ただし、ワークロードが全くない状態でも最小 2.5 NCU に対するコストが発生します)
  • Neptune ML ... 分類や回帰、リンク予測といった、いくつかのタイプの推論タスクがサポートされています。機械学習モデルとして「グラフニューラルネットワーク」と「ナレッジグラフ埋め込みモデル」が利用できます。
  • Neptune Streams ... グラフに対する変更イベントをキャプチャでき、通知や他データストアへの保存といったユースケースに利用できます。
  • 全文検索 ... Amazon OpenSearch Service と組み合わせて、全文検索を用いたノード・エッジ検索ができます。(OpenSearch ドメインに 1 ノード = 1 ドキュメント、1 エッジ = 1 ドキュメントとして保存しておく必要があります)

(2024/3/4 追記)Neptune ML に関する記事を投稿しました。良ければ併せてご覧ください。

https://zenn.dev/simpleform_blog/articles/20240304-01-neptune-ml-recommendation

アーキテクチャ概要

本記事で解説していくアーキテクチャの全体像について以下に示します。

RDB (Amazon Aurora) に存在するデータをもとにグラフデータを生成し、Neptune クラスターに投入するシナリオを想定します。Glue ジョブを使用して、RDB 上のデータを読み取り、バルクロード用グラフデータとして Neptune が想定する形式に加工します。S3 に保存された加工済みデータを利用して、Lambda 関数から Neptune に対するバルクロードを実行します。この一連の処理を AWS Step Functions (SFN) を利用してワークフロー化します。

Neptune へのデータロード方式

Neptune クラスターに対してデータをロードするには、いくつかの方法が用意されています。

  • SPARQL の INSERT ステートメントや、Gremlin の addV(), addE() といったクエリ言語でデータを挿入していく
  • Neptune Bulk Loader エンドポイントを使用して、S3 上の外部ファイルが持つ大量データを取り込む [1]
  • AWS Database Migration Service (DMS) を使用して、他のデータストアからデータをインポートする [2]

RDB のような既存のデータソースからグラフ DB をバッチ的に構築する場合、Bulk Loader や DMS を使用するのが現実的な選択肢になるかと思います。本記事では、Bulk Loader を利用する方法について扱います。

Neptune Bulk Loader エンドポイント

Neptune クラスターをデプロイすると、以下のような形式の Writer エンドポイントが作成されます。接続ポートはデフォルトで 8182 です。

  • XXXXXXXX.cluster-XXXXXXXX.ap-northeast-1.neptune.amazonaws.com

Neptune Bulk Loader エンドポイント URL は以下で提供されます。このエンドポイントに対して、POST / GET / DELETE リクエストを送ることができます。これらについて後述します。

  • XXXXXXXX.cluster-XXXXXXXX.ap-northeast-1.neptune.amazonaws.com:8182/loader


Neptune Bulk Loader エンドポイントのイメージ

POST リクエスト - ロードジョブの生成

POST リクエストを送ることで、新規のロードジョブを生成できます。[3]

Python であれば、例えば以下のようにリクエストを送ります。POST に成功すると一意なロードジョブ ID が付与され、レスポンスの中身から確認することができます。

data = {
    "source": f"s3://{BUCKET_NAME}/{prefix}",
    "format": "csv",
    "iamRoleArn": NEPTUNE_CLUSTER_ROLE_ARN,
    "region": "ap-northeast-1",
    "failOnError": "FALSE",
    "parallelism": "MEDIUM",
    "updateSingleCardinalityProperties": "TRUE",
    "queueRequest": "TRUE",
    "dependencies": []
}

response = requests.post(
    url=LOADER_ENDPOINT_URL,
    headers={"Content-Type": "application/json"},
    json=data,
)
  • "source" ... ロード対象となるデータを持つファイルの S3 ロケーションを指定します。(ロードデータの生成については後述します)単一ファイルの S3 Key を指定することもできますし、複数ファイルが格納された S3 Prefix を指定することも可能です。
  • "updateSingleCardinalityProperties" ... SingleCardinality(後述)である場合に、既存の属性値を上書きするか否かを指定します。
  • "dependencies" ... 生成するロードジョブが依存するロードジョブがある場合、それらの ID をリストとして指定することで、実行のタイミングが考慮されます。明示的な指定がない場合、FIFO(先入先出)で処理されます。

GET リクエスト - Get-Status API

GET リクエストを送ることで、生成済みのロードジョブの状態を確認できます。これは Get-Status API とも呼ばれます。[4]

# 特定のロード ID に関する情報を取得するには loadId を指定します
GET https://YOUR_NEPTUNE_ENDPOINT:8182/loader?loadId=loadId
GET https://YOUR_NEPTUNE_ENDPOINT:8182/loader/loadId

# ロード ID を指定しない場合、ロード ID のリストが返されます
GET https://YOUR_NEPTUNE_ENDPOINT:8182/loader

detailserrors といったリクエストパラメータが用意されているので、これらのオプションを有効にすることでより詳細な情報を取得することができます。[5][6]

一般的なレスポンス構造
{
    "status": "200 OK",
    "payload": {
        "feedCount": [
            {
                "LOAD_FAILED": "number"
            }
        ],
        "overallStatus": {
            "fullUri": "s3://bucket/key",
            "runNumber": "number",
            "retryNumber": "number",
            "status": "string",
            "totalTimeSpent": "number",
            "startTime": "number",
            "totalRecords": "number",
            "totalDuplicates": "number",
            "parsingErrors": "number",
            "datatypeMismatchErrors": "number",
            "insertErrors": "number",
        },
        "failedFeeds": [
            {
                "fullUri": "s3://bucket/key",
                "runNumber": "number",
                "retryNumber": "number",
                "status": "string",
                "totalTimeSpent": "number",
                "startTime": "number",
                "totalRecords": "number",
                "totalDuplicates": "number",
                "parsingErrors": "number",
                "datatypeMismatchErrors": "number",
                "insertErrors": "number",
            }
        ],
        "errors": {
            "startIndex": "number",
            "endIndex": "number",
            "loadId": "string",
            "errorLogs": [
                {
                    "errorCode": "string",
                    "errorMessage" : "string",
                    "fileName": "string",
                    "recordNum": "number"
                }
            ]
        }
    }
}

生成したジョブが実行完了したかどうかの確認や、失敗時にその原因に関する情報を得る場合などに有用です。例えば、ロードデータ生成用の ETL ジョブにミスがあって、Edge が指定する Vertex のいずれかが存在していない場合、errors を有効にすることで以下のようなエントリを確認できます。

{
    "errorCode": "FROM_OR_TO_VERTEX_ARE_MISSING",
    "errorMessage": "Either from vertex, '01fc5e7e2e17sxsbbehv4v40md', or to vertex, '0b3d04cf2518c5bff0a3799e05d282bf', is not present.",
    "fileName": "s3://example-bucket/some_key",
    "recordNum": 0
}

DELETE リクエスト - ロードジョブのキャンセル

DELETE リクエストを送ることで、実行中または実行開始前のロードジョブをキャンセルできます。[7]

DELETE https://YOUR_NEPTUNE_ENDPOINT:8182/loader?loadId=loadId
DELETE https://YOUR_NEPTUNE_ENDPOINT:8182/loader/loadId

キャンセルされたロードジョブのステータスは LOAD_CANCELLED_BY_USER になります。[8]

ロードデータの生成

本記事では Apache TinkerPop / Gremlin ロードデータ形式について紹介します。[9]openCypher, RDF については公式ドキュメントにてご確認ください)

Gremlin の場合、CSV 形式でデータを用意します。データ転送量削減のため、gzip 圧縮がサポートされています。単一ファイル内に Vertex データと Edge データの両方を含めることはできませんが、単一ファイル内に異なるラベルのデータを持たせることは可能です。(ただし ETL の実装上、異なるラベルはファイルも分けておくのが無難だと思います)

ロードデータを生成するための ETL 処理は、AWS であれば Glue ジョブを利用するのが良いと思います。(ロードジョブの POST リクエストにおいて、"source" が Key だけでなく Prefix もサポートしているため、ファイルサイズの調整のためにパーティション分割しても問題ありません)

Vertex データ形式

以下は Vertex のサンプルデータです。~id, ~label が必須のカラムです。その名の通り、それぞれ Vertex を識別するユニークな ID およびラベルを意味します。(簡単のため a, b, ... としておりますが、DB におけるサロゲートキーなど、ユニークになるものを選択します)

Vertex サンプルデータ
~id, ~label, name:String(single)
a, user, Alice
b, user, Bob
c, user, Charlie

必須カラム以外はプロパティになります。プロパティカラムの場合、プロパティ名とともに : の後にデータ型を指定します。

propertyname:type(cardinality)

Edge データ形式

続いて Edge のサンプルデータです。~id, ~label に加えて、~from, ~to が必須のカラムになります。その名の通り、どの Vertex ID 同士を結ぶか指定します。指定する Vertex ID は、Edge データのバルクロード実行時点で存在している必要があります。(存在しない場合、insertErrors としてカウントされます)Vertex データ形式同様、プロパティを持たせることも可能です。

Edge サンプルデータ
~id, ~from, ~to, ~label
a-to-b, a, b, follow
c-to-b, c, b, follow

ETL 実装

以上の内容を踏まえて、AWS Step Functions (SFN) を利用して以下のような ETL パイプラインを構築しました。ワークフロー全体をいくつかのステップに分割して SFN ワークフローとして実装し、メインのワークフローからネストで呼び出す形にしています。

  • CreateBulkloadData ... バルクロード用データの生成
  • Bulkload ... バルクロードジョブの登録(POST)
  • CheckBulkloadStatus ... バルクロードジョブのステータス確認(GET)

各ワークフローの詳細

各ステップの SFN ワークフローグラフ構造は以下のようになっています。

CreateBulkloadData

Glue ジョブを利用してバルクロードするデータを生成します。このステップではジョブ実行の順序性を意識する必要はありません。保守性・拡張性の観点から、単一 Glue ジョブにまとめず、グラフデータの Label 毎に Glue ジョブを用意しています。

当社では DB 各テーブルに、更新日時を表す timestamp 型カラムを持たせています。このカラムの値を利用してデータフレームをフィルタリングすることで、定期的な差分更新を実現しています。


Bulkload

Bulk Loader への POST リクエストを Lambda から実行することで、バルクロードジョブを生成します。このステップでは実行の順序性を考慮する必要があります。すなわち、Edge データの前に Vertex データのロードジョブを作成します。この実装例では Vertex, Edge それぞれ Map ステート を用意し、Label 毎にバルクロードジョブを生成しています。


CheckBulkloadStatus

Get-Status API を利用して、Bulkload ステップで作成されたロードジョブ群が実行完了しているかどうか確認しています。バルクロードジョブのステータス変更イベントは EventBridge で検知できないため、Wait ステート を活用するなどして定期的にステータスを確認する必要があります。


Slack 通知

最後に、Lambda から Amazon SNS → AWS Chatbot を介して Slack 通知させています。

詳細は割愛しますが、バルクロード実行に関するレポートはデータ基盤に取り込んでおり、通知された SFN ワークフローの実行名とロードジョブ ID を用いて、各ジョブの詳細情報を Redash 上で確認できるようにしています。

さいごに

Amazon Neptune における ETL の実装について紹介しました。Neptune を導入する際の参考になれば幸いです。当社でも導入してみたばかりなので、運用していく中で新たな知見を得られたらまたアウトプットしてみたいと思います。

最後まで読んで頂きありがとうございました。

関連記事

脚注
  1. https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load.html ↩︎

  2. https://docs.aws.amazon.com/neptune/latest/userguide/dms-neptune.html ↩︎

  3. https://docs.aws.amazon.com/neptune/latest/userguide/load-api-reference-load.html ↩︎

  4. https://docs.aws.amazon.com/neptune/latest/userguide/load-api-reference-status.html ↩︎ ↩︎

  5. https://docs.aws.amazon.com/ja_jp/neptune/latest/userguide/load-api-reference-status-requests.html ↩︎

  6. https://docs.aws.amazon.com/ja_jp/neptune/latest/userguide/load-api-reference-status-response.html ↩︎

  7. https://docs.aws.amazon.com/neptune/latest/userguide/load-api-reference-cancel.html ↩︎

  8. https://docs.aws.amazon.com/neptune/latest/userguide/loader-message.html ↩︎

  9. https://docs.aws.amazon.com/neptune/latest/userguide/bulk-load-tutorial-format-gremlin.html ↩︎

Discussion