👋

イベントトリガーでCloud Data FusionのETLを実行する

2022/05/11に公開

これは何か

DataFusionの試していたところ、データパイプラインをスケジューリング実行することはできそうだったが、イベントトリガーで動かす機能についてはパッと見当たらなかった。
以下の記事でCloud Functionで動かしている人がいたので、自分でも試してみつつ、本番稼働を想定するとリソースをコード管理する必要があると思うので、terraformで各種リソースを設定する上でハマった部分など残しておく。

Data Fusionとは何か

GCPで提供されているフルマネージドのETLツール

実行内容

よくあるユースケースとしてGCSにCSVファイルをアップロードしたことをトリガーに、Data Fusionで変換し、DWHであるBigQueryにデータを取り込むケースを試して見た。

概要

GCSへのアップロードをトリガーにData Fusionを動かすのは、Cloud FunctionでData FusionのAPIを叩く形で実装した。
Data FusionではアップロードしたCSVファイルをJSONに変換して、BigQueryにinsertし、BigQuery上でparseする。

実装

Data Fusionの設定

以下のJSON形式で定義されたData Fusionのパイプラインは上記のようになる。

JSONコード
datafusion.json
{
    "name": "weekly_survey_etl",
    "description": "Data Pipeline Application",
    "artifact": {
        "name": "cdap-data-pipeline",
        "version": "6.6.0",
        "scope": "SYSTEM"
    },
    "config": {
        "resources": {
            "memoryMB": 2048,
            "virtualCores": 1
        },
        "driverResources": {
            "memoryMB": 2048,
            "virtualCores": 1
        },
        "connections": [
            {
                "from": "GCS",
                "to": "JSONFormatter"
            },
            {
                "from": "JSONFormatter",
                "to": "BigQuery"
            }
        ],
        "comments": [],
        "postActions": [],
        "properties": {},
        "processTimingEnabled": true,
        "stageLoggingEnabled": false,
        "stages": [
            {
                "name": "GCS",
                "plugin": {
                    "name": "GCSFile",
                    "type": "batchsource",
                    "label": "GCS",
                    "artifact": {
                        "name": "google-cloud",
                        "version": "0.19.0",
                        "scope": "SYSTEM"
                    },
                    "properties": {
                        "useConnection": "false",
                        "referenceName": "weekly_survey_csv",
                        "path": "gs://{PROJECT_ID}-input/${my-file}", # {PROJECT_ID}の部分は自分で差し替える
                        "format": "csv",
                        "sampleSize": "1000",
                        "delimiter": ",",
                        "skipHeader": "true",
                        "filenameOnly": "false",
                        "recursive": "false",
                        "encrypted": "false",
                        "fileEncoding": "UTF-8",
                        "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"questionaire_id\",\"type\":\"string\"},{\"name\":\"respondent_id\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"q1\",\"type\":\"string\"},{\"name\":\"q2\",\"type\":\"string\"},{\"name\":\"q3\",\"type\":\"string\"},{\"name\":\"q4\",\"type\":\"string\"},{\"name\":\"q5\",\"type\":\"string\"}]}",
                        "project": "auto-detect",
                        "serviceAccountType": "filePath",
                        "serviceFilePath": "auto-detect"
                    }
                },
                "outputSchema": [
                    {
                        "name": "etlSchemaBody",
                        "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"questionaire_id\",\"type\":\"string\"},{\"name\":\"respondent_id\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"q1\",\"type\":\"string\"},{\"name\":\"q2\",\"type\":\"string\"},{\"name\":\"q3\",\"type\":\"string\"},{\"name\":\"q4\",\"type\":\"string\"},{\"name\":\"q5\",\"type\":\"string\"}]}"
                    }
                ],
                "id": "GCS"
            },
            {
                "name": "JSONFormatter",
                "plugin": {
                    "name": "JSONFormatter",
                    "type": "transform",
                    "label": "JSONFormatter",
                    "artifact": {
                        "name": "transform-plugins",
                        "version": "2.8.0",
                        "scope": "SYSTEM"
                    },
                    "properties": {
                        "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"answer\",\"type\":\"string\"}]}"
                    }
                },
                "outputSchema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"answer\",\"type\":\"string\"}]}",
                "inputSchema": [
                    {
                        "name": "GCS",
                        "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"questionaire_id\",\"type\":\"string\"},{\"name\":\"respondent_id\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"q1\",\"type\":\"string\"},{\"name\":\"q2\",\"type\":\"string\"},{\"name\":\"q3\",\"type\":\"string\"},{\"name\":\"q4\",\"type\":\"string\"},{\"name\":\"q5\",\"type\":\"string\"}]}"
                    }
                ],
                "id": "JSONFormatter",
                "type": "transform",
                "label": "JSONFormatter",
                "icon": "icon-jsonformatter"
            },
            {
                "name": "BigQuery",
                "plugin": {
                    "name": "BigQueryTable",
                    "type": "batchsink",
                    "label": "BigQuery",
                    "artifact": {
                        "name": "google-cloud",
                        "version": "0.19.0",
                        "scope": "SYSTEM"
                    },
                    "properties": {
                        "referenceName": "BigQuery",
                        "project": "{PROJECT_ID}", # {PROJECT_ID}の部分は自分で差し替える
                        "dataset": "dataset",
                        "table": "survey",
                        "serviceAccountType": "filePath",
                        "operation": "insert",
                        "truncateTable": "false",
                        "allowSchemaRelaxation": "false",
                        "location": "US",
                        "createPartitionedTable": "false",
                        "partitioningType": "TIME",
                        "partitionFilterRequired": "false",
                        "schema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"answer\",\"type\":\"string\"}]}",
                        "serviceFilePath": "auto-detect"
                    }
                },
                "outputSchema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"answer\",\"type\":\"string\"}]}",
                "inputSchema": [
                    {
                        "name": "JSONFormatter",
                        "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"answer\",\"type\":\"string\"}]}"
                    }
                ],
                "id": "BigQuery",
                "type": "batchsink",
                "label": "BigQuery",
                "icon": "fa-plug"
            }
        ],
        "schedule": "0 1 */1 * *",
        "engine": "spark",
        "numOfRecordsPreview": 100,
        "rangeRecordsPreview": {
            "min": 1,
            "max": "5000"
        },
        "description": "Data Pipeline Application",
        "maxConcurrentRuns": 1
    }
}

Cloud Functionの設定

main.py
# Cloud Functinosのソースファイル
import subprocess
import requests
import json
import os
import time

def get_access_token():

    scopes='https://www.googleapis.com/auth/cloud-platform'
    headers={'Metadata-Flavor': 'Google'}

    api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes

    r = requests.get(api,headers=headers).json()

    return r['access_token']

def run_job(data, context):

    PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
    PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
    INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
    REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
    NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
    CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')

    default_file_name = data['name']
    
    bucket_name = data['bucket']
    
    auth_token=get_access_token()
    
    post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
    
    data = "{'my-file':'" + default_file_name +"'}"
    
    post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
    
    r1 = requests.post(post_endpoint,data=data,headers=post_headers)

インフラの設定

各リソースは次のように設定した。

Data Fusion

Data Fusionではインスタンス・名前空間・パイプラインの作成が必要になる。パイプラインでは、Data Fusionの設定を記載したJSONファイルを読み込み、デプロイしている。

datafusion
# data fusionの設定
resource "google_data_fusion_instance" "instance" {
  name = "sasakky-datafusion-instance"
  project = var.GCP_PROJECT_ID
  region = var.REGION
  type = "BASIC"
  dataproc_service_account = google_service_account.develop.email
}

resource "cdap_namespace" "namespace" {
  name = "sandbox"
}

resource "cdap_application" "pipeline" {
  name = "weekly_survey_etl"
  namespace = cdap_namespace.namespace.name
  spec = file("../src/data_fusion/weekly_survey_etl-cdap-data-pipeline.json")

  depends_on = [
    cdap_namespace.namespace
  ]
}

Cloud Function・Cloud Storageの設定

Cloud Functionをデプロイするバケット({GCP_PROJECT_ID}-function)と、Cloud Functionがトリガーとするバケット({GCP_PROJECT_ID}-input)を作成する。inputバケットの方にCSVファイルをアップロードする。

cloudfunction
resource "google_storage_bucket" "function_bucket" {
    name     = "${var.GCP_PROJECT_ID}-function"
    location = var.REGION
}

resource "google_storage_bucket" "input_bucket" {
    name     = "${var.GCP_PROJECT_ID}-input"
    location = var.REGION
}

data "archive_file" "source" {
    type        = "zip"
    source_dir  = "../src/cloud_function"
    output_path = "/tmp/function.zip"
}

resource "google_storage_bucket_object" "zip" {
    source       = data.archive_file.source.output_path
    content_type = "application/zip"

    name         = "src-${data.archive_file.source.output_md5}.zip"
    bucket       = google_storage_bucket.function_bucket.name

    depends_on   = [
        google_storage_bucket.function_bucket, 
        data.archive_file.source
    ]
}

resource "google_cloudfunctions_function" "function" {
    name                  = "weekly_survey_etl_function"
    runtime               = "python37"  # of course changeable

    source_archive_bucket = google_storage_bucket.function_bucket.name
    source_archive_object = google_storage_bucket_object.zip.name

    entry_point           = "run_job"
    service_account_email = google_service_account.develop.email

    environment_variables = {
      INSTANCE_ID = google_data_fusion_instance.instance.name,
      REGION = var.REGION,
      PIPELINE_NAME = cdap_application.pipeline.name,
      PROJECT_ID = var.GCP_PROJECT_ID,
      NAMESPACE_ID = cdap_namespace.namespace.name,
      CDAP_ENDPOINT = "https://${google_data_fusion_instance.instance.name}-${var.GCP_PROJECT_ID}-dot-usc1.datafusion.googleusercontent.com/api"
    }
    
    event_trigger {
        event_type = "google.storage.object.finalize"
        resource   = "${var.GCP_PROJECT_ID}-input"
    }

    depends_on            = [
        google_storage_bucket.function_bucket,  
        google_storage_bucket_object.zip,
        google_data_fusion_instance.instance,
        cdap_namespace.namespace,
        cdap_application.pipeline,
        google_service_account.develop
    ]
}

Cloud Functionを動かすサービスアカウント

次のようなロールを設定した。

IAMのterraform設定
iam
# サービスアカウント作成とロールの付与
resource "google_service_account" "develop" {
  project = var.GCP_PROJECT_ID
  account_id = "develop"
  display_name = "develop"
}

resource "google_project_iam_member" "develop_cloudfunction_invoker" {
  project = var.GCP_PROJECT_ID
  role = "roles/cloudfunctions.invoker"
  member = "serviceAccount:${google_service_account.develop.email}"
}

resource "google_project_iam_member" "develop_datafusion_admin" {
  project = var.GCP_PROJECT_ID
  role = "roles/datafusion.admin"
  member = "serviceAccount:${google_service_account.develop.email}"
}

resource "google_project_iam_member" "develop_storage_admin" {
  project = var.GCP_PROJECT_ID
  role = "roles/storage.admin"
  member = "serviceAccount:${google_service_account.develop.email}"
}

resource "google_project_iam_member" "develop_bigquery_editer" {
  project = var.GCP_PROJECT_ID
  role = "roles/bigquery.dataEditor"
  member = "serviceAccount:${google_service_account.develop.email}"
}

resource "google_project_iam_member" "develop_bigquery_jobUser" {
  project = var.GCP_PROJECT_ID
  role = "roles/bigquery.jobUser"
  member = "serviceAccount:${google_service_account.develop.email}"
}

resource "google_project_iam_member" "develop_serviceAccountUser" {
  project = var.GCP_PROJECT_ID
  role = "roles/iam.serviceAccountUser"
  member = "serviceAccount:${google_service_account.develop.email}"
}

resource "google_project_iam_member" "develop_dataproc_worker" {
  project = var.GCP_PROJECT_ID
  role = "roles/dataproc.worker"
  member = "serviceAccount:${google_service_account.develop.email}"
}

resource "google_project_iam_member" "sa-datafusion_serviceAccountUser" {
  project = var.GCP_PROJECT_ID
  role = "roles/iam.serviceAccountUser"
  member = "serviceAccount:service-{PROJECT_NO}@gcp-sa-datafusion.iam.gserviceaccount.com"
}

考察

イベントトリガー実装の仕組み

Cloud FunctionからData Fusionへのつなぎこみの仕組みとしては、
Cloud FunctionでアップロードされたGCSのディレクトリとファイル名を取得し、引数としてDataFusionのAPIに渡してコールする。

main.py
    default_file_name = data['name']
    
    bucket_name = data['bucket']
    
    data = "{'my-file':'" + default_file_name +"'}"
    
    r1 = requests.post(post_endpoint,data=data,headers=post_headers)

DataFusion側では、${my-file}としてAPIと一緒に渡された引数を受け取って、READ先のデータの場所を指定する。

datafusion.json
"path": "gs://{PROJECT_ID}-input/${my-file}",

IAM権限設定の注意点

terraformでIAMを設定するときは、Data Fusionで指定するサービスアカウントのロールだけでなく、以下のサービスアカウントにも「サービスアカウント ユーザー」のロールを付与しないとエラーになった。

iam.tf
resource "google_project_iam_member" "sa-datafusion_serviceAccountUser" {
  project = var.GCP_PROJECT_ID
  role = "roles/iam.serviceAccountUser"
  member = "serviceAccount:service-{PROJECT_NO}@gcp-sa-datafusion.iam.gserviceaccount.com"
}

こちらのStackOverflowを見る感じ、プロセスを実行しているのは別のサービスアカウントになりそう。

Discussion