[AWS] コードを触りながら AWS Step Functionsの挙動を確認する②(Glue Studioとの連携)
はじめに
ご覧いただきありがとうございます。阿河です。
前回に引き続き、AWS Step Functionsを用いた自動化ワークフローを作っていきたいと思います。
今回はAWS Step FunctionsとGlueジョブを連携させていきます。
対象者
- AWSを運用中の方
- ワークフロー制御を行いたい方
概要
※前回記事の対象
- 事前準備
- AWS Step Functionsを実行するトリガーを用意する
- AWS Step Functions側の実装
- 検証①
※本記事の対象
- クローラー作成と実行
- GlueジョブとAWS Step Functionsの連携
- 検証②
本検証では「S3バケットの所定フォルダ配下にファイルをアップロードする」「DynamoDBの所定のテーブルを、重複処理チェック用に使用している」関係で、検証を実行する度にファイルの消去やテーブルのレコード情報をクリーンアップした上で行っております。
この初期化作業を行わない場合、想定していた動作と異なる結果になる可能性がありますので、ご了承ください。
5. クローラー作成と実行
S3側のフォルダ構成
今回は検証として、12/1付のCSVデータをS3バケットの所定のフォルダ配下にアップロードします。
前記事と同様に、CSVファイルは「バケット名/branch-officeA/12/1/」の配下にアップロードします。
この後Glueクローラーを走らせるので、CSVファイルをアップロードしておいてください。
アップロードするCSVファイルは上記のような構造にしています。
別途データ出力用のS3バケットを作成します。
「バケット名(CSVファイルアップロード用とは別バケット)/branch-officeA/12/1/」配下に、output1/output2と呼ばれるフォルダを作成しています。
出力結果はoutput1フォルダの配下に出力します。
Glue用のIAMロールを作成
- 信頼されたエンティティタイプ: AWSのサービス
- ユースケース: Glue
- 権限: AWSGlueServiceRole/追加のS3権限
Glueデータベース作成
AWS Glueのコンソールページより、Databaseの画面に移行。
Name: 任意の名前
データベースが作成されます。
現時点ではテーブルは空の状態になっております。
Glueクローラーの作成
クローラーの作成を行います。
Name: 任意の名前
データソースの設定を行います。
- Data source: S3
- Location of S3 data: In this account
- S3 path: s3://※CSVをアップロードするS3バケットの名前/branch_officeA/branch-officeA/12/1
Glue用に作成したIAMロールを設定します。
- Target database: 先ほど作成したGlueデータベース
- Table name prefix: branch_officeA/12/1
- Crawler schedule: On demand
以上の設定でクローラーを作成します。
Glueクローラーの実行
クローラーを実行します。
クロールの実行が完了しました。
クローラーの実行が成功したら、指定したデータベースを確認します。
データベース配下にテーブル情報が作成されました。
Glueジョブで今セクションで作成したデータカタログを利用します。
なおカラム名が日本語であることが個人的には気になっています。
Glueジョブで、カラム名を英語表記に直していきたいと思います。
6. GlueジョブとAWS Step Functionsの連携
Glueジョブの作成
Glueジョブを作成します。
この記事はStep Functionsの実装がメインのため、Glueの処理部分はGUIベースでサクッと作っていきます。
ジョブの作成画面で、「Visual with a source and target」を選択します。
S3⇒S3のフローにします。
Createボタンを押すと、フロー図が現れます。
一つ一つ見ていきます。
- ソースとなるS3バケット
データのソースとなるS3バケットを指定します。
データカタログテーブルでメタデータを管理しているため、「データベース」「テーブル」の指定で問題ありません。
- Renameフィールド
ApplyMappingをRename Fieldに変更します。Node PropertiesのNode typeでアクションを変更できます。
Rename Fieldが現れるので、カラム名を英語表記に変更します。
Output schemaでは、変換した結果が表示されます。
残り2カラムも英語表記に変更が必要です。
ActionからRenameを2つ追加します。
フロー図から分離してしまっているので、関連づける必要があります。
Node Parentsを指定することで、親となるノードを指定することができます。
cf. Rename Field2はRename Field1を親として、関連付けます。
順番に並べ替えると、このようなイメージです。
Renameフィールドを設定した上で、Rename Field3のOutput schemaを表示すると、すべてのカラムが英語表記にできたことが分かります。
最後に出力先のS3バケットの設定です。
Format: CSV
Compression Type: None
S3 Target Location: 「※出力先バケット名/branch-officeA/12/1/output1」
Data Catalog update options: Do not update the Data Catalog
以上でフロー図の構成は終わりです。
GUIで作成した内容は、Scriptボタンを押すことでコードとして表示することもできます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="csv-transform-database",
table_name="branch_officea/12/11",
transformation_ctx="S3bucket_node1",
)
# Script generated for node Rename Field1
RenameField1_node1671012616209 = RenameField.apply(
frame=S3bucket_node1,
old_name="支社",
new_name="branch",
transformation_ctx="RenameField1_node1671012616209",
)
# Script generated for node Rename Field2
RenameField2_node1671012657279 = RenameField.apply(
frame=RenameField1_node1671012616209,
old_name="顧客id",
new_name="customer_id",
transformation_ctx="RenameField2_node1671012657279",
)
# Script generated for node Rename Field3
RenameField3_node1671012662064 = RenameField.apply(
frame=RenameField2_node1671012657279,
old_name="商品id",
new_name="product_id",
transformation_ctx="RenameField3_node1671012662064",
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=RenameField3_node1671012662064,
connection_type="s3",
format="csv",
connection_options={
"path": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"partitionKeys": [],
},
transformation_ctx="S3bucket_node3",
)
job.commit()
ここまで出来たら、「Job details」の設定を行います。
- Name: 任意のジョブ名
- IAM Role: 作成したGlueのロール
- Requested number of workers: 0
- Job bookmark: Disable
- Number of retries: 0
設定が終わったら、Saveします。
Glueジョブの実行
RUNボタンを押すと、ジョブを実行できます。
1分前後で処理が終わると思います。
Run Statusが"Succeeded"になるのを待ちます。
出力先のS3バケットフォルダ配下に、ファイルが生成されています。
オブジェクト情報から、S3 Selectを利用してみましょう。
SQLクエリをかけます。
branch,customer_id,product_id
branch_A,11325678,productB
branch_A,11432525,productA
branch_A,11437543,productA
branch_A,11432525,productC
branch_A,11432525,productA
branch_A,11432435,productC
branch_A,11029255,productB
branch_A,12872525,productC
branch_A,11483625,productB
branch_A,11298525,productB
カラムが英語表記に変わっていることが分かります。
ステートマシンの編集
ステートマシンの編集をします。
現在のフローは以下の通りです。
- Debug(Type: Pass)の部分を「Type: Task」に変更して、Glueジョブをトリガーします。
- Glueジョブに「branch」「customer_id」「product_id」というパラメータを渡して、Glueジョブ内で使えるようにします。
最初にステートマシンのIAMロールに、Glueに関する権限を与えます。
次にステートマシンを編集します。
{
"StartAt": "Check_DB1",
"States":{
"Check_DB1": {
"Type": "Task",
"Resource":"arn:aws:lambda:us-east-1:xxxxxxxxxxx:function:Check_DB",
"ResultPath": "$.result",
"Next": "Choice1"
},
"Choice1": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.result",
"NumericEquals": 1,
"Next": "StartJob"
},
{
"Variable": "$.result",
"NumericEquals": 0,
"Next": "Fail"
}
],
"Default": "Fail"
},
"StartJob": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters":{
"JobName":"csv-transform-studio-job2",
"Arguments": {
"--branch_param": "branch",
"--customer_param": "customer_id",
"--product_param": "product_id"
}
},
"End": true
},
"Fail": {
"Type": "Fail",
"Cause":"Exist"
}
}
}
またGlueジョブ側でもパラメータを受け取る必要があります。
前回自動生成されたコードをもとに、Spark script editorで新たにジョブを作成してみました。
コードは下記のように編集します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME", "branch_param", "customer_param", "product_param"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
br = args["branch_param"]
cp = args["customer_param"]
pp = args["product_param"]
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="csv-transform-database",
table_name="branch_officea/12/11",
transformation_ctx="S3bucket_node1",
)
# Script generated for node Rename Field1
RenameField1 = RenameField.apply(
frame=S3bucket_node1,
old_name="支社",
new_name=br,
transformation_ctx="RenameField1",
)
# Script generated for node Rename Field2
RenameField2 = RenameField.apply(
frame=RenameField1,
old_name="顧客id",
new_name=cp,
transformation_ctx="RenameField2",
)
# Script generated for node Rename Field3
RenameField3 = RenameField.apply(
frame=RenameField2,
old_name="商品id",
new_name=pp,
transformation_ctx="RenameField3",
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=RenameField3,
connection_type="s3",
format="csv",
connection_options={
"path": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"partitionKeys": [],
},
transformation_ctx="S3bucket_node3",
)
job.commit()
Job detailsは前回と同じ設定を行い、ジョブをSaveします。
ステートマシン側で呼び出すGlueジョブ名を変更します。
これで準備完了です。
7. 検証
CSVファイルをアップロードします。
実行は成功しました。
出力先のS3バケット内を確認します。
無事出力されました。
S3 Selectの結果も問題ありませんでした。
ちなみに上記の図は、ファイルアップロード時にCSVファイル以外をアップロードした際のパターンです。
「StartJob」が"States.TaskFailed"として失敗しています。
※States.TaskFailedの原因
"ErrorMessage": "An error occurred while calling o102.pyWriteDynamicFrame. Unable to parse file: branchA_20221201-test.xlsx",
※Glue側のRUNS
Glueジョブの失敗をキャッチできているようです。
AWS BatchやAmazon ECSなど統合サービスの場合、Step Functions は次の状態に進む前にリクエストが完了するまで待機します。
"StartJob": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
今回の検証は以上です。
必要に応じてステートマシンのフローを増やしてみてください。
※DynamoDBと引き続き連携させることもできます。
さいごに
次回以降は単発でStep Functionsの機能について、検証していきたいと思います。
御覧いただき ありがとうございました!
Discussion