🗂️

Dataproc Serverless でテンプレートを利用したバッチ処理を実行する

2023/08/22に公開

はじめに

こんにちは、クラウドエース データML ディビジョン所属の松本です。
クラウドエースのITエンジニアリングを担うシステム開発部の中で、特にデータ基盤構築・分析基盤構築からデータ分析までを含む一貫したデータ課題の解決を専門とするのがデータML ディビジョンです。

データML ディビジョンでは活動の一環として、毎週 Google Cloud の新規リリースを調査・発表し、データ領域のプロダクトのキャッチアップをしています。その中でも重要と考えるリリースを本ページ含め記事として公開しています。

今回紹介するリリースは、Dataproc の「Dataproc Serverless テンプレート」についてです。
このリリースにより、Dataproc Serverless でテンプレートを利用したバッチ処理を作成することが可能になりました。

Dataproc Serverless とは

ロゴ
Dataproc Serverless とは、Apache Spark や Apache Hadoop などのオープンソースデータ処理フレームワークを使用して、大規模なデータ処理を行うためのフルマネジードサービスです。
サーバーレスであるため、クラスタの管理やスケーリングなどのタスクをユーザーが行う必要がなく、必要なリソースを自動的に割り当て、ワークロードを実行します。
https://cloud.google.com/dataproc-serverless/docs?hl=ja

今回のリリースについて

今回ご紹介するのは、Dataproc Serverless テンプレート という、Dataproc Serverless のバッチ処理用のテンプレートに関して、既存のテンプレートに加え、新たに5つのテンプレートが提供されたことについてです。(2023年6月26日付に発表)

https://cloud.google.com/dataproc/docs/release-notes#June_26_2023

リリース概要

Dataproc Serverless テンプレートの追加提供について

Dataproc Serverless テンプレート を使用すると、コードを記述することなく、Spark でのバッチ処理をすばやく実行できます。
このテンプレートは JavaPython の両方で提供されています。

今回のリリースにより、以下のテンプレートが新たに追加されました。

  • Cloud Storage to BigQuery
  • Cloud Storage to Cloud Spanner
  • Hive to Cloud Storage
  • JDBC to BigQuery
  • JDBC to Cloud Storage

その他にも、以下のテンプレートが提供されています。(2023年8月15日時点)

  • Cloud Spanner to Cloud Storage
  • Cloud Storage to Cloud Storage
  • Cloud Storage to JDBC
  • Hive to BigQuery
  • JDBC to Cloud Spanner
  • JDBC to JDBC
  • Pub/Sub to Cloud Storage

ユースケース

Dataproc Serverless テンプレートのユースケースとしては、以下などが挙げられます。

  • コードを記述することなく、一般的な Spark バッチワークロードをすばやく実行したい
  • Dataproc Serverless の利点を活かし、大規模なデータのバッチワークロードをすばやく実行したい
  • Hive などの Hadoop 上の分散ストレージにあるデータの処理をすばやく実行したい
  • Spark に関して詳しくないが、Dataproc Serverless でのデータパイプラインを構築したい

Dataflow テンプレートとの違い

Google Cloud では Dataflow というフルマネージドでサーバレスな分散処理サービスがあり、
Dataflow にも、Dataproc Serverless テンプレートと似たような Google 提供のテンプレート が用意されています。

現状、Dataflow の方が提供されているテンプレート数が多く、ストリーミング処理用のテンプレートも提供されているため、多くのケースでは、Dataflow テンプレートでも要件を満たすことができます。

しかし、Dataflow テンプレートにはHiveに関するテンプレートなど、一部提供されていないものもあるため、そういったテンプレートを利用した場合は、Dataproc Serverless テンプレートを活用できます。

また、Dataproc Serverless テンプレートでは、Spark SQL を利用してのデータ変換が可能なため、SQLでデータ変換処理を実装したい場合は適しています。
(尚、Dataflow の Google 提供テンプレートでは、JavaScript でユーザー定義関数を記述することにより、データ変換が可能です。)

試してみた

今回は、Dataproc Serverless テンプレートである GCS To BigQuery を利用して、GCS から BigQuery への簡易なETLパイプラインを実装してみたい思います。

1. Dataproc Serverless のステージング用 GCS バケットの作成

Dataproc Serverless のステージング場所として使用する GCS バケットを作成します。
Dataproc Serverless はこのバケットを使用して、サーバーレスクラスタの実行に必要な依存関係を保存します。

$ export TEMP_BUCKET=my-staging-bucket
$ gsutil mb -l asia-northeast1 gs://$TEMP_BUCKET

2. インプットデータの準備

インプットとなる csv ファイルを作成します。

test.csv
item_id,sales,datetime
item01,1.23,2023-01-01 00:00:00
item02,2.34,2023-01-02 00:00:00

GCS バケットを作成し、csv ファイルをアップロードします。

$ export INPUT_BUCKET=my-input-bucket
$ gsutil mb -l asia-northeast1 gs://$INPUT_BUCKET
$ gsutil cp test.csv gs://$INPUT_BUCKET

3. VPC ネットワークのデフォルト サブネットに対する Google 公開アクセスの許可

今回は、Dataproc の VPC ネットワークのサブネットをデフォルトで使用しますが、事前に、Google 公開アクセスをオンにしておく必要があります。

$ gcloud compute networks subnets update default --region=asia-northeast1 --enable-private-ip-google-access

4. Dataproc Serverless のジョブを実行する

gcloud コマンドを使用して、Dataproc Serverless ジョブを実行します。
以下のコマンドでは、Spark SQL を使用してデータを変換した後、BigQueryへデータを格納しています。

$ PROJECT_ID=my_project
$ REGION=asia-northeast1
$ TEMPLATE_VERSION=latest
$ CLOUD_STORAGE_PATH=gs://$INPUT_BUCKET/test.csv
$ FORMAT=csv
$ DATASET=my_dataset
$ TABLE=my_table
$ TEMPVIEW=my_data
$ SQL_QUERY="select item_id, cast(round(sales, 0) as int) as sales, cast(datetime as date) as date from global_temp.my_data"

$ gcloud dataproc batches submit spark \
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \
    --version="1.1" \
    --project="$PROJECT_ID" \
    --region="$REGION" \
    --jars="gs://dataproc-templates-binaries/$TEMPLATE_VERSION/java/dataproc-templates.jar" \
    -- --template=GCSTOBIGQUERY \
    --templateProperty project.id="$PROJECT_ID" \
    --templateProperty gcs.bigquery.input.location="$CLOUD_STORAGE_PATH" \
    --templateProperty gcs.bigquery.input.format="$FORMAT" \
    --templateProperty gcs.bigquery.output.dataset="$DATASET" \
    --templateProperty gcs.bigquery.output.table="$TABLE" \
    --templateProperty gcs.bigquery.temp.bucket.name="$TEMP_BUCKET" \
    --templateProperty gcs.bigquery.temp.table="$TEMPVIEW" \
    --templateProperty gcs.bigquery.temp.query="$SQL_QUERY"

上記コマンドでは、以下のジョブ構成を指定しています。

変数名 説明
PROJECT_ID Dataproc Serverless を実行する GCP プロジェクト
REGION Dataproc Serverless を実行するリージョン
TEMP_BUCKET Dataproc Serverless のステージング用 GCS バケットの場所
CLOUD_STORAGE_PATH インプットとなる GCS バケット内の csv ファイルのURI
FORMAT インプットのデータ形式( avro、parquet、csv、json )を指定
DATASET BigQuery の宛先データセット
TABLE BigQuery の宛先テーブル
TEMPVIEW (省略可)Spark SQL でのデータ変換における、一時ビュー名
SQL_QUERY (省略可)Spark SQL でのデータ変換における、クエリステートメント
※TEMPVIEW と SQL_QUERY のテーブル名が一致する必要があります

尚、データ変換は以下のクエリ文を指定することで行なっています。

$ SQL_QUERY="select item_id, cast(round(sales, 0) as int) as sales, cast(datetime as date) as date from global_temp.my_data"
  • item_id:変換なし
  • sales:元の値を小数点第1位で四捨五入し、int 型に変換
  • datetime:date 型に変換し、カラム名を date に変更

参考

テンプレート実行方法の詳細は、以下をご参考ください。
https://cloud.google.com/dataproc-serverless/docs/templates/storage-to-bigquery?hl=ja#use_the_template
https://github.com/GoogleCloudPlatform/dataproc-templates/tree/main/java/src/main/java/com/google/cloud/dataproc/templates/gcs

Spark SQL に関しての詳細は、以下をご参考ください。
https://spark.apache.org/docs/latest/sql-ref.html

5. Dataproc Serverless のジョブ結果確認

Dataproc Serverless バッチのコンソールより、ジョブが完了していることを確認します。
Dtaproc ジョブ実行結果

BigQuery コンソールにて、指定した宛先テーブルにデータが格納されていることを確認します。
出力結果として、データ変換が想定通りに実施されていることが分かります。
BigQuery 出力結果1
BigQuery 出力結果2

まとめ

今回の記事では、Dataproc Serverless テンプレートについてご紹介しました。
このリリースにより、Dataproc Serverless テンプレートよって、コードを記述することなく、Spark バッチワークロードをすばやく実行できるようになりました。
Dataproc Serverless でのデータパイプラインをすばやく構築したい場合に、便利な方法になりますので、是非ご活用ください。

関連記事

Discussion