AWS Glueを利用してDynamoDBへ大量データ投入する方法
はじめに
AWS Glueを利用してDynamoDBへ大量データ投入する手順を記載していきます。作業時間は15-30分程度を想定しています。
初期データ移行などに使う手順となっております。通常運用にはIAM Role設定が大雑把となっておりますので微調整し流用頂ければと思います。また、GUIで設定している部分はコード化など検討が可能ですが、今回は対応していません。また各所で速度チューニングについても触れていきます。
前提条件
事前にS3へデータ投入ファイルをDynamoDBへ登録したいカタチにヘッダーつきCSVで作成していることとします。
CSVの整形(ETL)を行うとGlueが複雑化、速度低下します。切り分けて事前にS3に完成させておくことは初期データ投入において重要なポイントです。S3は作成済とします。
また、DynamoDBも事前に作成済とします。DynamoDBはオンデマンドで作成することが重要です。柔軟にキャパシティをGlueがコントロールしてくれることでしょう。
構築する全体像
3段階に分けてつくります。以下の順となります。
- 全体的に使うIAM Role作成
- ①スクリプトを実行するためのテーブル作り
- ②スクリプトの作成
全体的に使うIAM Role作成
手順を簡略化するためにAWS管理ポリシー+最低限のインラインポリシーを組み合わせます。正しくは①用と②用でロールを分けたほうがよいですが、作業手順削減を優先させています。
- AWS管理ポリシー
-
AWSGlueServiceRole
- Glue全般の許可
-
AmazonDynamoDBFullAccess
- ②のDynamoDB投入用
-
AWSGlueServiceRole
- インラインポリシー
- 投入用データCSVが保存されているS3の指定バケット許可
- GlueのJOB実行許可
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::dev-test/*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:StartJobRun",
"glue:StartWorkflowRun"
],
"Resource": "*"
}
]
}
このようなカタチでロールが作成されました。
①スクリプトを実行するためのテーブル作り
Clawlerを作って対象のバケットを読み込み、データカタログ(テーブル)を作っていきましょう。
選択するバケット(階層指定可)には1種類のCSVファイル定義のみが入っている階層を指定する必要があることに注意してください。
Create Crawler。
Nameを入力しNext。
Add a sourceをクリックし、データカタログ(テーブル)を作りたいS3の階層を設定します。
ほかの項目は触らず、Add an S3 data sourceをクリックしNext。
IAM Roleに作成したものを設定しNext。
DataBaseを設定。作成していない場合はAdd Databaseしてから設定します
Table name prefix はテーブル名を判別しやすいようにつけるといいでしょう。デフォルトでは指定した階層名がテーブル名となるためです。
最後にCreate crawlerを行います。
作成したCrawlerを Run を押して実行しましょう。
Data Catalog>Databasesにテーブルが作成されています。
Nameをクリックし、下にすこしスクロール。Shemaの Edit shema as JSONをクリックします。
数字と判断されたデータはすべてbigintになっています。ttlはそのままでよいですが、stringであってほしい項目はすべてJSON編集から訂正し Save as new table versionを押して保存してください。
以上でGlueのスクリプトで利用していくデータカタログ(テーブル)が完成しました。
後続のプログラムではS3ではなく、データカタログにアクセスするようなプログラムとなります。
ここまでが、コード内容を簡略化させるポイントになる作業になります。
②スクリプトの作成
データカタログを読み込んでDynamoDBへデータ登録するスクリプトの作成、実行手順になります。
AWS Glue Studio > Jobs から Spark Script editerを選択しCreateをクリックします 。
スクリプトを記載します。dynamodb.throughput.write.percentは0.5が通常速度とし、最大1.5まで設定可能です。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glue_context.create_dynamic_frame.from_catalog(
database="<①で作成したデータベース>", table_name="<①で作成したテーブル>", transformation_ctx="S3bucket_node1"
)
print(S3bucket_node1.getNumPartitions())
# Write Data to Dynamo DB table
glue_context.write_dynamic_frame_from_options(
frame=S3bucket_node1,
connection_type="dynamodb",
connection_options={"dynamodb.output.tableName": "<事前に用意しているデータ投入先のDynamoDBテーブル名>","dynamodb.throughput.write.percent":"1.5"})
job.commit()
JobDetail
JOB名をつけます。空白を入れないようにしましょう。xxx.pyの名前になるためローマ字が良いでしょう。
IAM Roleは作成したものを設定します。
Worker type は G 1Xを選びましょう。このプログラムはDynamoDBのPut時間がほぼすべてであり、CPUやメモリのサイズに影響しません。G 2Xにしてもパフォーマンスは向上しません。
Requsted number of workersは検証しながら数を調整しましょう。デフォルトは10になっていますが、このパラメータを増加させるとDynamoDBのPutパラレル度が上昇しGlueのパフォーマンスが向上していきます。
Job bookmarkはDisableにしましょう。一度読み込んだことのあるCSVデータは取込対象外とする便利な設定ですが、初期データ投入では再実行などをするときの邪魔になります。
Number of retriesは 0 にしましょう。データ投入用途では長時間JOBが予想されるので、誤ってリトライになった場合のリスクを避けます。
Job timeoutは適切な値を設定します。2880では48時間で2日になりますが、一度組んだJOBを30分程度実行し、DynamoDBに登録された実績データをみて目標データ量がどのくらいで投入できそうかを逆算。すこし多めに設定すると良いでしょう。
最後にRun をクリックすればデータ投入が始まります。
最後に念の為、件数比較
登録した件数をDynamoDBのスキャンで確認します。スクリプトに投入予定のデータフレーム(今回でいうとS3bucket_node1という変数)のレコード数をログに出力したり、CSVをヘッダー行を除いてカウントし、DynamoDBのスキャン数と一致していることを確認。欠損がないことを念の為確認します。一部のデータを検索し確認するのも良いでしょう。お疲れさまでした。
この記事がどなたかの役に立てれば幸いです。
Discussion