🚀

[AWS] コードを触りながら AWS Step Functionsの挙動を確認する②(Glue Studioとの連携)

2022/12/26に公開


はじめに

ご覧いただきありがとうございます。阿河です。

前回に引き続き、AWS Step Functionsを用いた自動化ワークフローを作っていきたいと思います。

今回はAWS Step FunctionsとGlueジョブを連携させていきます。

対象者

  • AWSを運用中の方
  • ワークフロー制御を行いたい方

概要

※前回記事の対象

  1. 事前準備
  2. AWS Step Functionsを実行するトリガーを用意する
  3. AWS Step Functions側の実装
  4. 検証①

※本記事の対象

  1. クローラー作成と実行
  2. GlueジョブとAWS Step Functionsの連携
  3. 検証②

本検証では「S3バケットの所定フォルダ配下にファイルをアップロードする」「DynamoDBの所定のテーブルを、重複処理チェック用に使用している」関係で、検証を実行する度にファイルの消去やテーブルのレコード情報をクリーンアップした上で行っております。
この初期化作業を行わない場合、想定していた動作と異なる結果になる可能性がありますので、ご了承ください。

5. クローラー作成と実行

S3側のフォルダ構成

今回は検証として、12/1付のCSVデータをS3バケットの所定のフォルダ配下にアップロードします。

前記事と同様に、CSVファイルは「バケット名/branch-officeA/12/1/」の配下にアップロードします。

この後Glueクローラーを走らせるので、CSVファイルをアップロードしておいてください。


アップロードするCSVファイルは上記のような構造にしています。

別途データ出力用のS3バケットを作成します。
「バケット名(CSVファイルアップロード用とは別バケット)/branch-officeA/12/1/」配下に、output1/output2と呼ばれるフォルダを作成しています。

出力結果はoutput1フォルダの配下に出力します。

Glue用のIAMロールを作成

  • 信頼されたエンティティタイプ: AWSのサービス
  • ユースケース: Glue
  • 権限: AWSGlueServiceRole/追加のS3権限

Glueデータベース作成

AWS Glueのコンソールページより、Databaseの画面に移行。

Name: 任意の名前

データベースが作成されます。

現時点ではテーブルは空の状態になっております。

Glueクローラーの作成

クローラーの作成を行います。

Name: 任意の名前

データソースの設定を行います。

  • Data source: S3
  • Location of S3 data: In this account
  • S3 path: s3://※CSVをアップロードするS3バケットの名前/branch_officeA/branch-officeA/12/1

Glue用に作成したIAMロールを設定します。

  • Target database: 先ほど作成したGlueデータベース
  • Table name prefix: branch_officeA/12/1
  • Crawler schedule: On demand

以上の設定でクローラーを作成します。

Glueクローラーの実行

クローラーを実行します。

クロールの実行が完了しました。
クローラーの実行が成功したら、指定したデータベースを確認します。

データベース配下にテーブル情報が作成されました。

Glueジョブで今セクションで作成したデータカタログを利用します。
なおカラム名が日本語であることが個人的には気になっています。

Glueジョブで、カラム名を英語表記に直していきたいと思います。

6. GlueジョブとAWS Step Functionsの連携

Glueジョブの作成

Glueジョブを作成します。
この記事はStep Functionsの実装がメインのため、Glueの処理部分はGUIベースでサクッと作っていきます。

ジョブの作成画面で、「Visual with a source and target」を選択します。
S3⇒S3のフローにします。

Createボタンを押すと、フロー図が現れます。
一つ一つ見ていきます。

  • ソースとなるS3バケット

データのソースとなるS3バケットを指定します。
データカタログテーブルでメタデータを管理しているため、「データベース」「テーブル」の指定で問題ありません。

  • Renameフィールド

ApplyMappingをRename Fieldに変更します。Node PropertiesのNode typeでアクションを変更できます。

Rename Fieldが現れるので、カラム名を英語表記に変更します。

Output schemaでは、変換した結果が表示されます。
残り2カラムも英語表記に変更が必要です。

ActionからRenameを2つ追加します。

フロー図から分離してしまっているので、関連づける必要があります。

Node Parentsを指定することで、親となるノードを指定することができます。
cf. Rename Field2はRename Field1を親として、関連付けます。

順番に並べ替えると、このようなイメージです。
Renameフィールドを設定した上で、Rename Field3のOutput schemaを表示すると、すべてのカラムが英語表記にできたことが分かります。

最後に出力先のS3バケットの設定です。

Format: CSV
Compression Type: None
S3 Target Location: 「※出力先バケット名/branch-officeA/12/1/output1」
Data Catalog update options: Do not update the Data Catalog

以上でフロー図の構成は終わりです。
GUIで作成した内容は、Scriptボタンを押すことでコードとして表示することもできます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="csv-transform-database",
    table_name="branch_officea/12/11",
    transformation_ctx="S3bucket_node1",
)

# Script generated for node Rename Field1
RenameField1_node1671012616209 = RenameField.apply(
    frame=S3bucket_node1,
    old_name="支社",
    new_name="branch",
    transformation_ctx="RenameField1_node1671012616209",
)

# Script generated for node Rename Field2
RenameField2_node1671012657279 = RenameField.apply(
    frame=RenameField1_node1671012616209,
    old_name="顧客id",
    new_name="customer_id",
    transformation_ctx="RenameField2_node1671012657279",
)

# Script generated for node Rename Field3
RenameField3_node1671012662064 = RenameField.apply(
    frame=RenameField2_node1671012657279,
    old_name="商品id",
    new_name="product_id",
    transformation_ctx="RenameField3_node1671012662064",
)

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=RenameField3_node1671012662064,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
        "partitionKeys": [],
    },
    transformation_ctx="S3bucket_node3",
)

job.commit()

ここまで出来たら、「Job details」の設定を行います。

  • Name: 任意のジョブ名
  • IAM Role: 作成したGlueのロール
  • Requested number of workers: 0
  • Job bookmark: Disable
  • Number of retries: 0

設定が終わったら、Saveします。

Glueジョブの実行

RUNボタンを押すと、ジョブを実行できます。


1分前後で処理が終わると思います。
Run Statusが"Succeeded"になるのを待ちます。

出力先のS3バケットフォルダ配下に、ファイルが生成されています。
オブジェクト情報から、S3 Selectを利用してみましょう。

SQLクエリをかけます。

branch,customer_id,product_id
branch_A,11325678,productB
branch_A,11432525,productA
branch_A,11437543,productA
branch_A,11432525,productC
branch_A,11432525,productA
branch_A,11432435,productC
branch_A,11029255,productB
branch_A,12872525,productC
branch_A,11483625,productB
branch_A,11298525,productB

カラムが英語表記に変わっていることが分かります。

ステートマシンの編集

ステートマシンの編集をします。
現在のフローは以下の通りです。

  • Debug(Type: Pass)の部分を「Type: Task」に変更して、Glueジョブをトリガーします。
  • Glueジョブに「branch」「customer_id」「product_id」というパラメータを渡して、Glueジョブ内で使えるようにします。

最初にステートマシンのIAMロールに、Glueに関する権限を与えます。
次にステートマシンを編集します。

{
  "StartAt": "Check_DB1",
  "States":{
    "Check_DB1": {
      "Type": "Task",
      "Resource":"arn:aws:lambda:us-east-1:xxxxxxxxxxx:function:Check_DB",
      "ResultPath": "$.result",
      "Next": "Choice1"
    },
    "Choice1": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.result",
        "NumericEquals": 1,
        "Next": "StartJob"
      },
      {
        "Variable": "$.result",
        "NumericEquals": 0,
        "Next": "Fail"
      } 
    ],
    "Default": "Fail"
    },
    "StartJob": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters":{
        "JobName":"csv-transform-studio-job2",
        "Arguments": {
          "--branch_param": "branch",
          "--customer_param": "customer_id",
          "--product_param": "product_id"
        }
      },
      "End": true
    },
      
    "Fail": {
      "Type": "Fail",
      "Cause":"Exist"
    }
  }
}

またGlueジョブ側でもパラメータを受け取る必要があります。
前回自動生成されたコードをもとに、Spark script editorで新たにジョブを作成してみました。

コードは下記のように編集します。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME", "branch_param", "customer_param", "product_param"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

br = args["branch_param"]
cp = args["customer_param"]
pp = args["product_param"]



# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="csv-transform-database",
    table_name="branch_officea/12/11",
    transformation_ctx="S3bucket_node1",
)

# Script generated for node Rename Field1
RenameField1 = RenameField.apply(
    frame=S3bucket_node1,
    old_name="支社",
    new_name=br,
    transformation_ctx="RenameField1",
)

# Script generated for node Rename Field2
RenameField2 = RenameField.apply(
    frame=RenameField1,
    old_name="顧客id",
    new_name=cp,
    transformation_ctx="RenameField2",
)

# Script generated for node Rename Field3
RenameField3 = RenameField.apply(
    frame=RenameField2,
    old_name="商品id",
    new_name=pp,
    transformation_ctx="RenameField3",
)

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=RenameField3,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
        "partitionKeys": [],
    },
    transformation_ctx="S3bucket_node3",
)

job.commit()

Job detailsは前回と同じ設定を行い、ジョブをSaveします。

ステートマシン側で呼び出すGlueジョブ名を変更します。

これで準備完了です。

7. 検証

CSVファイルをアップロードします。

実行は成功しました。
出力先のS3バケット内を確認します。

無事出力されました。

S3 Selectの結果も問題ありませんでした。

ちなみに上記の図は、ファイルアップロード時にCSVファイル以外をアップロードした際のパターンです。
「StartJob」が"States.TaskFailed"として失敗しています。

※States.TaskFailedの原因

"ErrorMessage": "An error occurred while calling o102.pyWriteDynamicFrame. Unable to parse file: branchA_20221201-test.xlsx",

※Glue側のRUNS

Glueジョブの失敗をキャッチできているようです。

https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/connect-to-resource.html#connect-sync

AWS BatchやAmazon ECSなど統合サービスの場合、Step Functions は次の状態に進む前にリクエストが完了するまで待機します。

"StartJob": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",

今回の検証は以上です。
必要に応じてステートマシンのフローを増やしてみてください。
※DynamoDBと引き続き連携させることもできます。

さいごに

次回以降は単発でStep Functionsの機能について、検証していきたいと思います。

御覧いただき ありがとうございました!

MEGAZONE株式会社 Tech Blog

Discussion