GlueからRDSへの並列読み取りの仕様を明らかにしよう
はじめに
先日、とあるプロジェクトでAmazon Web Services(AWS)のGlueを利用し、Apache Sparkを用いた並列処理を実装する機会がありました。Glueは大規模データの並列分散処理を得意とするサービスです。一般的にはGlueのソースとしてS3を利用するケースが多いですが、今回のプロジェクトではソースがRDSという珍しいケースでした。
GlueからRDSへの読み込みに関する情報はあまり公開されておらず、特に「どのようにRDSへ並列読み込みを実現しているのか」について、明確な情報がありませんでした。そこで、本記事ではGlueからRDSへの並列読み込み方法の実装例を記載し、さらにRDSのログを調査することで、どのように並列読み込みを実現しているかを明らかにします。
本記事の対象者
- Glue(Spark)を使用したことがある方
- GlueからRDSへの並列読み込みを実装したい方
記事の構成
記事の構成は以下の通りです。
- 環境準備
- プログラム実装方法
- 検証
- 最後に
1. 環境準備
1.1 アーキテクチャ
今回は以下のアーキテクチャで検証を実施します。
アーキテクチャ
ここでのポイントは、GlueをRDSと同じサブネット内で起動するために、Glue Connection(Glue接続)を用いてVPC設定を実施している点です。
Glue Connectionの詳細は公式ドキュメントが参考になります。
1.2 RDSの作成
DBエンジンにはPostgreSQLを使用します。基本的な作成手順は次の公式ドキュメントを参考にしました。
次に、全てのSQLログを出力するための設定を行います。カスタムパラメータグループを作成し、以下の2つのパラメータを変更します。
-
log_statement
:all
-
log_min_duration_statement
:0
設定の詳細については、公式ドキュメントとre:Postが参考になります。
1.3 Glue Connectionの作成
Glueジョブを作成する前に、Glue Connectionを用意します。設定値は以下の通りです。
-
Name:
GlueConnectionToRDS
-
Type(Data Source):
JDBC
-
JDBC URL:
jdbc:postgresql://<RDSのエンドポイント名>:<RDSのポート番号>/<DB名>
- UserName/Password:RDS作成時に設定した情報
- VPC/Subnet:RDSと同じVPCおよびサブネット
- セキュリティグループ:自己参照型のセキュリティグループを指定
JDBC URLの形式はDBエンジンごとに異なるため、詳細は以下の公式ドキュメントを確認してください。
また、セキュリティグループの設定については以下の記事や公式ドキュメントが参考になります。1.4 Glueジョブの作成
続いて、Glueジョブを作成します。リソース設定における主な項目は以下の通りです。
- IAM Role:GlueとRDSのアクセス権限を付与したロール
-
Type:
Spark
-
Glue Version:
Glue 5.0
-
Language:
Python 3
-
Worker type:
G1X
-
Number of workers:
3
- Connections:先ほど作成したGlue Connectionを指定
1.5 データの準備
RDSにログテーブルを作成し、そのテーブル内にダミーデータを挿入します。
テーブル定義
以下のようなシンプルなテーブルを作成します。
CREATE TABLE log (
customer_id INT NOT NULL,
status VARCHAR(50) NOT NULL,
amount NUMERIC(10,2) NOT NULL,
event_time TIMESTAMP NOT NULL
);
ダミーデータの作成
Pythonを使用してダミーデータを作成します。作成したデータは別途INSERTします。
statuses = ["new", "processing", "completed", "failed"]
# 1000件のサンプルデータを作成
for i in range(1000):
customer_id = random.randint(1000, 2000)
status = random.choice(statuses)
amount = round(random.uniform(100, 10000), 2)
event_time = datetime.now() - timedelta(minutes=i*5)
2. プログラム実装方法
これがプログラムの全体構成です。Glueジョブの基本構造の中で、RDSからのデータ抽出処理を追記します。
import sys
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# ----------------------------------------------
# ここに記述
# ----------------------------------------------
job.commit()
2.1 GlueからRDSへのデータ読み込み方法
今回はDynamicFrame
を用いて、RDSからデータを読み込みます。RDSへの接続情報はGlue Connectionに設定しているため、Glueジョブ内からはGlue Connectionの定義情報を参照します。他の設定項目は、公式ドキュメントだけでは分かりにくく、私自身苦労したため、ここで詳しく説明します。
- 基本:テーブル全量を並列なしで読み込み
これが基本の構成です。指定したテーブルを全量読み込みます。
dyf_rds = glueContext.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"useConnectionProperties": True, # Connectionを使う場合に設定
"connectionName": "GlueConnectionToRDS", # 作成したConnection名
"dbtable": "log", # 読み込むテーブル名
},
transformation_ctx="dyf_rds",
)
- 条件付き読み込み
テーブル全量ではなく、一部のレコードだけを取得する場合、sampleQuery
にSQL文を渡します。
# statusがnewのデータのみ取得
sql_query = """SELECT * FROM log WHERE status = 'new'"""
dyf_rds = glueContext.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"useConnectionProperties": True,
"connectionName": "GlueConnectionToRDS",
"dbtable": "log",
"sampleQuery": sql_query, # 条件付きクエリ
},
transformation_ctx="dyf_rds",
)
- 並列読み込み
並列で読み込みをする場合、hashpartitions
で並列数を指定し、分割キーをhashfield
または、hashexpression
で指定します。hashfield
とhashexpression
の違いについては公式ドキュメントにも説明がありますが、分かりづらいので、詳細は3章の検証で説明します。
dyf_rds = glueContext.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"useConnectionProperties":True,
"connectionName": "GlueConnectionToRDS",
"dbtable":"log",
"hashpartitions": "3", # 並列数
"hashfield":"customer_id", # 分割キー
},
transformation_ctx="dyf_rds",
)
- 条件付き読み込み+並列読み込み
条件付き読み込み(sampleQuery)と並列読み込みを組み合わせる場合、追加設定が必要です。
-
enablePartitioningForSampleQuery
:True
を指定する -
WHERE
句の末尾にAND
を付ける
# statusがnewのデータのみ取得
sql_query = """SELECT * FROM log WHERE status = 'new' AND"""
dyf_rds = glueContext.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"useConnectionProperties": True,
"connectionName": "GlueConnectionToRDS",
"dbtable": "log",
"sampleQuery": sql_query, # 抽出クエリ
"hashpartitions": "3", # 並列数
"hashexpression": "customer_id", # 分割キー
"enablePartitioningForSampleQuery": True, # サンプルクエリ+並列を有効化
},
transformation_ctx="dyf_rds",
)
3. 検証
第2章で説明した実装方法をもとにジョブを実行し、RDSのログを確認しながら、挙動を検証します。並列読み込みには分割キーの指定にはhashfield
とhashexpression
の2種類があるため、合計5パターンで確認します。
3.1 テーブル全量を並列なしで読み込み
まずは、テーブル全件を並列なしで読み込むケースです。RDSのログを確認すると、次の2つのクエリが発行されてました。
1 SELECT * FROM log WHERE 1=0
2 SELECT * FROM log
- 1つ目のクエリ:
WHERE 1=0
が付与されており、常にFalse
となるため、レコードは返却されず、スキーマ情報のみ取得 - 2つ目のクエリ:
SELECT * FROM log
により、1つのエグゼキュータが全件を取得
つまり、プログラムで指定したテーブルlog
を基に、全件取得のクエリが1度だけ発行されていることが分かります。
3.2 条件付き読み込み
次に、テーブルの全件ではなく、WHERE句等で絞ってデータを取得するケースです。
1 SELECT * FROM (SELECT * FROM log WHERE status = 'new') as log WHERE 1=0
2 SELECT * FROM (SELECT * FROM log WHERE status = 'new') as log
- 1つ目のクエリ:スキーマ情報のみ取得
- 2つ目のクエリ:プログラムで指定した
sampleQuery
を実行し、条件に一致するレコードを取得
このケースも並列数は指定していないので、1つのエグゼキュータが単独でデータを取得しています。
3.3 並列読み込み|hashfield
並列数を3,分割キーとしてhashfield
にcustomer_id
を指定したケースです。
1 SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 0) as log WHERE 1=0
2 SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 1) as log WHERE 1=0
3 SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 2) as log WHERE 1=0
4 SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 1) as log
5 SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 0) as log
6 SELECT * FROM (select * from log WHERE ('x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT % 3 = 2) as log
ここで確認できるポイントは以下の通りです。
- 1~3番目のクエリ:スキーマ情報のみ取得
- 4~6番目のクエリ:実際のデータを取得
-
'x'||SUBSTR(MD5(customer_id::TEXT), 25, 8))::BIT(31)::INT
の処理詳細-
customer_id
を文字列にキャスト - MD5でハッシュ化
- 一部を切り出しビット列に変換
- 最終的にINT型にキャスト
-
- 各条件式の末尾が
% 3 = 0, 1, 2
となっており、3つのエグゼキュータが並列でデータを取得
つまり、customer_id
をキーにMD5でハッシュ化して分散し、その結果を並列数毎に分割してクエリを発行しています。これにより、各エグゼキュータが取得するデータ量の偏り(データスキュー)が出にくくなり、効率的な処理につながります。
また、分割キーは最初に文字列にキャストされるため、カラムのデータ型に依存せず任意のカラムを分割キーに指定できます。
この分割するクエリはユーザ側で書く必要はなく、Glueが内部で自動生成し、RDSへ発行します。
3.4 並列読み込み|hashexpression
並列数を3,分割キーとしてhashexpression
にcustomer_id
を指定したケースです。
1 SELECT * FROM (select * from log WHERE customer_id % 3 = 0) as log WHERE 1=0
2 SELECT * FROM (select * from log WHERE customer_id % 3 = 1) as log WHERE 1=0
3 SELECT * FROM (select * from log WHERE customer_id % 3 = 2) as log WHERE 1=0
4 SELECT * FROM (select * from log WHERE customer_id % 3 = 2) as log
5 SELECT * FROM (select * from log WHERE customer_id % 3 = 1) as log
6 SELECT * FROM (select * from log WHERE customer_id % 3 = 0) as log
ここで確認できるポイントは以下の通りです。
- 1~3番目のクエリ:スキーマ情報のみ取得
- 4~6番目のクエリ:実際のデータを取得
- 条件式は
customer_id % 3 = 0, 1, 2
とシンプルに分割
この方法は、hashfield
と異なり、ハッシュ関数を使わず、分割キーに直接mod演算で分割しています。そのため、計算量は少なくシンプルですが、分割キーの分布によっては、データスキューが発生しやすいです。
また、分割キーにはINT型のみ指定可能です。他のデータ型を指定するとエラーが発生します。ただし、CAST(number AS INT)
のように、式を使ってINTに変換したものを指定すれば問題なく動作します。
3.5 条件付き読み込み+並列読み込み
WHERE句等でデータを絞りつつ、並列数を3,分割キーとしてhashexpression
にcustomer_id
を指定したケースです。
1 SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 0) as log WHERE 1=0
2 SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 1) as log WHERE 1=0
3 SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 2) as log WHERE 1=0
4 SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 0) as log
5 SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 2) as log
6 SELECT * FROM (SELECT * FROM log WHERE status = 'new' AND customer_id % 3 = 1) as log
- 1~3番目のクエリ:スキーマ情報のみ取得
- 4~6番目のクエリ:実際のデータを取得
-
sampleQuery
で指定したクエリSELECT * FROM log WHERE status = 'new' AND
と分割条件が組み合わさった条件式となっている。
つまり、ユーザが指定した条件クエリとGlueが内部で生成した並列処理用の分割条件が結合され、1つのWHERE句で適用されています。
4. 最後に
本記事では、GlueからRDSへの並列読み込み方法の実装例を記載し、さらにRDSのログを確認することで、Glueの並列読み込み時の挙動を調査しました。GlueやSparkでは、データがどのようにパーティションされるかを理解することは性能観点で非常に重要です。また、並列数を例えば20のように大きく設定すると、RDSには同時に20本のクエリを発行するため、DB側への負荷が大きくなりそうということも分かりました。
今回の調査は挙動を明らかにすることが目的でしたが、実案件でRDSへの並列読み込みを実施する際は、「並列数をどうするのか」「分割キーはどうするのか」「データスキューは問題ないか」「DB負荷は問題ないか」といった観点を意識する必要がありそうです。この記事がその検討の参考になれば幸いです。

NTT DATA公式アカウントです。 技術を愛するNTT DATAの技術者が、気軽に楽しく発信していきます。 当社のサービスなどについてのお問い合わせは、 お問い合わせフォーム nttdata.com/jp/ja/contact-us/ へお願いします。