NTT DATA TECH
🏔️

GlueからRDSへの並列読み取りの仕様を明らかにしよう

に公開

はじめに

先日、とあるプロジェクトでAmazon Web Services(AWS)のGlueを利用し、Apache Sparkを用いた並列処理を実装する機会がありました。Glueは大規模データの並列分散処理を得意とするサービスです。一般的にはGlueのソースとしてS3を利用するケースが多いですが、今回のプロジェクトではソースがRDSという珍しいケースでした。

GlueからRDSへの読み込みに関する情報はあまり公開されておらず、特に「どのようにRDSへ並列読み込みを実現しているのか」について、明確な情報がありませんでした。そこで、本記事ではGlueからRDSへの並列読み込み方法の実装例を記載し、さらにRDSのログを調査することで、どのように並列読み込みを実現しているかを明らかにします

本記事の対象者

  • Glue(Spark)を使用したことがある方
  • GlueからRDSへの並列読み込みを実装したい方

記事の構成

記事の構成は以下の通りです。

  1. 環境準備
  2. プログラム実装方法
  3. 検証
  4. 最後に

1. 環境準備

1.1 アーキテクチャ

今回は以下のアーキテクチャで検証を実施します。

アーキテクチャ

ここでのポイントは、GlueをRDSと同じサブネット内で起動するために、Glue Connection(Glue接続)を用いてVPC設定を実施している点です。

Glue Connectionの詳細は公式ドキュメントが参考になります。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/glue-connections.html

1.2 RDSの作成

DBエンジンにはPostgreSQLを使用します。基本的な作成手順は次の公式ドキュメントを参考にしました。
https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/CHAP_GettingStarted.CreatingConnecting.PostgreSQL.html#CHAP_GettingStarted.Creating.PostgreSQL

次に、全てのSQLログを出力するための設定を行います。カスタムパラメータグループを作成し、以下の2つのパラメータを変更します。

  • log_statement all
  • log_min_duration_statement 0

設定の詳細については、公式ドキュメントとre:Postが参考になります。
https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/USER_LogAccess.Concepts.PostgreSQL.Query_Logging.html
https://repost.aws/ja/knowledge-center/rds-postgresql-query-logging

1.3 Glue Connectionの作成

Glueジョブを作成する前に、Glue Connectionを用意します。設定値は以下の通りです。

  • NameGlueConnectionToRDS
  • Type(Data Source)JDBC
  • JDBC URLjdbc:postgresql://<RDSのエンドポイント名>:<RDSのポート番号>/<DB名>
  • UserName/Password:RDS作成時に設定した情報
  • VPC/Subnet:RDSと同じVPCおよびサブネット
  • セキュリティグループ:自己参照型のセキュリティグループを指定

JDBC URLの形式はDBエンジンごとに異なるため、詳細は以下の公式ドキュメントを確認してください。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/connection-properties.html#connection-properties-required
また、セキュリティグループの設定については以下の記事や公式ドキュメントが参考になります。
https://qiita.com/pioho07/items/7223c77f341b7909431e
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/setup-vpc-for-glue-access.html

1.4 Glueジョブの作成

続いて、Glueジョブを作成します。リソース設定における主な項目は以下の通りです。

  • IAM Role:GlueとRDSのアクセス権限を付与したロール
  • TypeSpark
  • Glue VersionGlue 5.0
  • LanguagePython 3
  • Worker typeG1X
  • Number of workers3
  • Connections:先ほど作成したGlue Connectionを指定

1.5 データの準備

RDSにログテーブルを作成し、そのテーブル内にダミーデータを挿入します。

テーブル定義

以下のようなシンプルなテーブルを作成します。

CREATE TABLE log (
    customer_id INT NOT NULL,
    status VARCHAR(50) NOT NULL,
    amount NUMERIC(10,2) NOT NULL,
    event_time TIMESTAMP NOT NULL
);

ダミーデータの作成

Pythonを使用してダミーデータを作成します。作成したデータは別途INSERTします。

statuses = ["new", "processing", "completed", "failed"]

# 1000件のサンプルデータを作成
for i in range(1000):
    customer_id = random.randint(1000, 2000)
    status = random.choice(statuses)
    amount = round(random.uniform(100, 10000), 2)
    event_time = datetime.now() - timedelta(minutes=i*5)

2. プログラム実装方法

これがプログラムの全体構成です。Glueジョブの基本構造の中で、RDSからのデータ抽出処理を追記します。

import sys
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
 
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ----------------------------------------------
# ここに記述
# ----------------------------------------------
 
job.commit()

2.1 GlueからRDSへのデータ読み込み方法

今回はDynamicFrameを用いて、RDSからデータを読み込みます。RDSへの接続情報はGlue Connectionに設定しているため、Glueジョブ内からはGlue Connectionの定義情報を参照します。他の設定項目は、公式ドキュメントだけでは分かりにくく、私自身苦労したため、ここで詳しく説明します。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-connect-jdbc-home.html

  1. 基本:テーブル全量を並列なしで読み込み
    これが基本の構成です。指定したテーブルを全量読み込みます。
dyf_rds = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "useConnectionProperties": True,                # Connectionを使う場合に設定
        "connectionName": "GlueConnectionToRDS",        # 作成したConnection名
        "dbtable": "log",                               # 読み込むテーブル名
    },
    transformation_ctx="dyf_rds",
)
  1. 条件付き読み込み
    テーブル全量ではなく、一部のレコードだけを取得する場合、sampleQueryにSQL文を渡します。
# statusがnewのデータのみ取得
sql_query = """SELECT * FROM log WHERE status = 'new'"""
dyf_rds = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "useConnectionProperties": True,
        "connectionName": "GlueConnectionToRDS",
        "dbtable": "log",
        "sampleQuery": sql_query,                       # 条件付きクエリ
    },
    transformation_ctx="dyf_rds",
)
  1. 並列読み込み
    並列で読み込みをする場合、hashpartitionsで並列数を指定し、分割キーをhashfieldまたは、hashexpressionで指定します。hashfieldhashexpressionの違いについては公式ドキュメントにも説明がありますが、分かりづらいので、詳細は3章の検証で説明します。

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/run-jdbc-parallel-read-job.html

dyf_rds = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "useConnectionProperties":True,              
        "connectionName": "GlueConnectionToRDS", 
        "dbtable":"log",
        "hashpartitions": "3",                         # 並列数
        "hashfield":"customer_id",                     # 分割キー    
    },
    transformation_ctx="dyf_rds",
)
  1. 条件付き読み込み+並列読み込み
    条件付き読み込み(sampleQuery)と並列読み込みを組み合わせる場合、追加設定が必要です。
  • enablePartitioningForSampleQueryTrueを指定する
  • WHERE句の末尾にANDを付ける
# statusがnewのデータのみ取得
sql_query = """SELECT * FROM log WHERE status = 'new' AND"""
dyf_rds = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "useConnectionProperties": True,
        "connectionName": "GlueConnectionToRDS",
        "dbtable": "log",
        "sampleQuery": sql_query,                       # 抽出クエリ
        "hashpartitions": "3",                          # 並列数
        "hashexpression": "customer_id",                # 分割キー
        "enablePartitioningForSampleQuery": True,       # サンプルクエリ+並列を有効化
    },
    transformation_ctx="dyf_rds",
)

3. 検証

第2章で説明した実装方法をもとにジョブを実行し、RDSのログを確認しながら、挙動を検証します。並列読み込みには分割キーの指定にはhashfieldhashexpressionの2種類があるため、合計5パターンで確認します。

3.1 テーブル全量を並列なしで読み込み

まずは、テーブル全件を並列なしで読み込むケースです。RDSのログを確認すると、次の2つのクエリが発行されてました。

1  SELECT * FROM log WHERE 1=0
2  SELECT * FROM log
  • 1つ目のクエリ:WHERE 1=0が付与されており、常にFalseとなるため、レコードは返却されず、スキーマ情報のみ取得
  • 2つ目のクエリ:SELECT * FROM logにより、1つのエグゼキュータが全件を取得

つまり、プログラムで指定したテーブルlogを基に、全件取得のクエリが1度だけ発行されていることが分かります。

3.2 条件付き読み込み

次に、テーブルの全件ではなく、WHERE句等で絞ってデータを取得するケースです。

1  SELECT * FROM (SELECT * FROM log WHERE status = 'new') as log WHERE 1=0
2  SELECT * FROM (SELECT * FROM log WHERE status = 'new') as log 
  • 1つ目のクエリ:スキーマ情報のみ取得
  • 2つ目のクエリ:プログラムで指定したsampleQueryを実行し、条件に一致するレコードを取得

このケースも並列数は指定していないので、1つのエグゼキュータが単独でデータを取得しています。

3.3 並列読み込み|hashfield

並列数を3,分割キーとしてhashfieldcustomer_idを指定したケースです。

1  SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 0) as log WHERE 1=0
2  SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 1) as log WHERE 1=0
3  SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 2) as log WHERE 1=0
4  SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 1) as log 
5  SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 0) as log 
6  SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 2) as log 

ここで確認できるポイントは以下の通りです。

  • 1~3番目のクエリ:スキーマ情報のみ取得
  • 4~6番目のクエリ:実際のデータを取得
  • 'x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INTの処理詳細
    • customer_idを文字列にキャスト
    • MD5でハッシュ化
    • 一部を切り出しビット列に変換
    • 最終的にINT型にキャスト
  • 各条件式の末尾が% 3 = 0, 1, 2となっており、3つのエグゼキュータが並列でデータを取得

つまり、customer_idをキーにMD5でハッシュ化して分散し、その結果を並列数毎に分割してクエリを発行しています。これにより、各エグゼキュータが取得するデータ量の偏り(データスキュー)が出にくくなり、効率的な処理につながります。
また、分割キーは最初に文字列にキャストされるため、カラムのデータ型に依存せず任意のカラムを分割キーに指定できます。

この分割するクエリはユーザ側で書く必要はなく、Glueが内部で自動生成し、RDSへ発行します。

3.4 並列読み込み|hashexpression

並列数を3,分割キーとしてhashexpressioncustomer_idを指定したケースです。

1  SELECT * FROM (select * from log WHERE customer_id % 3 = 0) as log WHERE 1=0
2  SELECT * FROM (select * from log WHERE customer_id % 3 = 1) as log WHERE 1=0
3  SELECT * FROM (select * from log WHERE customer_id % 3 = 2) as log WHERE 1=0
4  SELECT * FROM (select * from log WHERE customer_id % 3 = 2) as log 
5  SELECT * FROM (select * from log WHERE customer_id % 3 = 1) as log 
6  SELECT * FROM (select * from log WHERE customer_id % 3 = 0) as log 

ここで確認できるポイントは以下の通りです。

  • 1~3番目のクエリ:スキーマ情報のみ取得
  • 4~6番目のクエリ:実際のデータを取得
  • 条件式はcustomer_id % 3 = 0, 1, 2とシンプルに分割

この方法は、hashfieldと異なり、ハッシュ関数を使わず、分割キーに直接mod演算で分割しています。そのため、計算量は少なくシンプルですが、分割キーの分布によっては、データスキューが発生しやすいです。
また、分割キーにはINT型のみ指定可能です。他のデータ型を指定するとエラーが発生します。ただし、CAST(number AS INT)のように、式を使ってINTに変換したものを指定すれば問題なく動作します。

3.5 条件付き読み込み+並列読み込み

WHERE句等でデータを絞りつつ、並列数を3,分割キーとしてhashexpressioncustomer_idを指定したケースです。

1  SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 0) as log WHERE 1=0
2  SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 1) as log WHERE 1=0
3  SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 2) as log WHERE 1=0
4  SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 0) as log 
5  SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 2) as log 
6  SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 1) as log 
  • 1~3番目のクエリ:スキーマ情報のみ取得
  • 4~6番目のクエリ:実際のデータを取得
  • sampleQueryで指定したクエリSELECT * FROM log WHERE status = 'new' ANDと分割条件が組み合わさった条件式となっている。

つまり、ユーザが指定した条件クエリとGlueが内部で生成した並列処理用の分割条件が結合され、1つのWHERE句で適用されています。

4. 最後に

本記事では、GlueからRDSへの並列読み込み方法の実装例を記載し、さらにRDSのログを確認することで、Glueの並列読み込み時の挙動を調査しました。GlueやSparkでは、データがどのようにパーティションされるかを理解することは性能観点で非常に重要です。また、並列数を例えば20のように大きく設定すると、RDSには同時に20本のクエリを発行するため、DB側への負荷が大きくなりそうということも分かりました。

今回の調査は挙動を明らかにすることが目的でしたが、実案件でRDSへの並列読み込みを実施する際は、「並列数をどうするのか」「分割キーはどうするのか」「データスキューは問題ないか」「DB負荷は問題ないか」といった観点を意識する必要がありそうです。この記事がその検討の参考になれば幸いです。

NTT DATA TECH
NTT DATA TECH
設定によりコメント欄が無効化されています