🐻

Application Integration で FTP サーバから BigQuery への連携を構築してみた

に公開

はじめに

こんにちは、クラウドエースのベアです。
今回は Application Integration を使用して FTP サーバーから CSV ファイルをダウンロードし、Cloud Storage を経由して BigQuery に格納するデータ連携を行いたいと思います。

Application Integration について

Application Integration とは、Google Cloud が提供する iPaaS (Integration Platform as a Service) で様々なアプリケーションやサービスを連携させ、業務プロセスを自動化できるプロダクトです。

Application Integration は GUI ベースで直感的に操作ができ、普段コーディングをしない人でも簡単にシステム構築が可能です。

https://cloud.google.com/application-integration?hl=ja

同プロダクトについて弊社の過去の記事でも扱っているので参考にしてみてください。
https://zenn.dev/cloud_ace/articles/dde2db4ae5c49b

Integration Connectors について

様々なアプリケーションやサービスとの接続は Integration Connectors を利用することで実現が可能です。
設定にはコードは不要で、Application Integration と組み合わせることでワークフロー上で異なるシステムからデータを取り込み、変換し、別のシステムに書き込みといった一連のプロセスを視覚的に実装することができます。
接続はコネクタという単位で管理されています。

今回は FTP サーバーと Cloud Storage 、 BigQuery 用の 3 つのコネクタを作成しました。

Application Integration でのデータ連携処理構築

FTP サーバー → Cloud Storage → BigQuery で行うデータ連携処理を Application Integration 内で構築する流れを記述していきます。

Integration を作成する

Application Integration を開いて、 Integration を作成します。

TRIGGER と TASKS からフローを作成する

次にデータ連携のフローを Application Integration 内で作っていきます。
作成した Integration を開いて

「TRIGGERS」ボタンと「TASKS」ボタンから画像例のように Trigger、DataMapping、ForEachLoop、Connector を置いていき、点と点を矢印で繋ぎます。

ここで TRIGGER と TASK について説明します。
TRIGGER とはスケジューラー等の外部からの呼び出しによってタスクを開始できるトリガーです。
今回使用している TRIGGER は以下の通りです。

TRIGGER 説明
ScheduleTrigger 特定の間隔で定期的にジョブを実行するためのトリガーです。
API Trigger 外部イベントが発信された時に、特定の Application Integration のフローを起動させるための仕組みです。

TASKS はあらかじめ定義されたタスクを実行します。各タスクは一連のフローの中で入力を受け取り、結果を出力します。
今回使用している TASKS は以下の通りです。

TASKS 説明
DataMapping Application Integration 内で扱っているデータの抽出・変換を行うことができます。
ForEachLoop 現在(メイン)の統合からサブ統合を繰り返し呼び出すことができます。
Connector Integration Connectors で接続する Google Cloud や外部のサービスと連携を行うことができます。

FTP サーバーから CSV ファイル名一覧を取得する

ここでは FTP サーバーから CSV ファイル名の一覧を取得する処理を行っていきます。
Connector の Entity は List とします。

使用しているパラメータは以下の通りです。

パラメータ名 説明
filterClause 条件に基づいてオペレーションの結果を制限します。

入力パラメータの詳細は以下のドキュメントに記載があります。
https://cloud.google.com/application-integration/docs/configure-connectors-task?hl=ja#input-parameters

For Each Loop でループ処理を行う

ここではファイル名一覧取得で得たファイルの数の回数ループ処理を実行します。
今回はファイル名一覧取得の connectorOutputPayload をそのまま For Each Loop に渡し、
API Trigger を呼び出してループ処理を行っています。

FTP サーバーから CSV ファイルを取得する

ここでは FTP サーバーからのファイルダウンロードの処理を行います。
Connector の Action は Download です。

使用しているパラメータは以下の通りです。

パラメータ名 説明
RemoteFile リモートホスト上のファイル名。
HasBytes コンテンツをバイトとしてダウンロードするかどうかを指定します。デフォルト値は false です。

このパラメータを DataMapping で設定して Connector に渡します。

パラメータの詳細は以下のドキュメントに記載があります。
https://cloud.google.com/integration-connectors/docs/connectors/ftp/configure?hl=ja#actions-exp

FTP サーバーから Cloud Storage へアップロードする

ここでは取得したファイルを Cloud Storage へアップロードする処理を行います。
Connector の Action は UploadObject です。

使用しているパラメータは以下の通りです。

パラメータ名 説明
Bucket オブジェクトがアップロードされるバケット名。
ContentBytes バイト形式(Base64 でエンコードされた文字列)でアップロードするコンテンツ。
HasBytes コンテンツをバイトとしてアップロードするかどうか。
ObjectName アップロードするオブジェクトの名前。

このパラメータを DataMapping で設定してConnector に渡します。

パラメータの詳細は以下のドキュメントに記載があります。
https://cloud.google.com/integration-connectors/docs/connectors/cloudstorage/configure?hl=ja#upld

Cloud Storage から BigQuery へのデータ挿入

ここでは Cloud Storage から BigQuery へファイルのデータを挿入する処理を行っていきます。
Connector の Action は InsertLoadJob です。

使用しているパラメータは以下の通りです。

パラメータ名 説明
SourceURIs Google Cloud Storage URI のスペース区切りのリスト。
SourceFormat ファイルのソース形式。サポートされている値は次のとおりです。AVRO、NEWLINE_DELIMITED_JSON DATASTORE_BACKUP、PARQUET、ORC、CSV
DestinationTable クエリの宛先テーブル(DestProjectId.DestDatasetId.DestTable 形式)。
CSVSkipLeadingRows CSV ファイルの先頭でスキップする行数を指定します。通常、ヘッダー行をスキップするために使用します。
Autodetect JSON ファイルと CSV ファイルのオプションとスキーマを自動的に決定するかどうかを指定します。
WriteDisposition 宛先テーブルにデータを書き込む方法を指定します。既存の結果を切り捨てる、既存の結果を追加する、テーブルが空の場合にのみ書き込みを行うなどです。
DestinationTableSchema テーブルの作成に使用するフィールドを指定する JSON リスト。
CSVAllowQuotedNewlines CSV ファイルに引用符で囲まれたフィールド内に改行を含めることができるかどうかを指定します。
デフォルト値は false です。

このパラメータを DataMapping で設定してConnector に渡します。

パラメータの詳細は以下ドキュメントに記載があります。
https://cloud.google.com/integration-connectors/docs/connectors/bigquery/configure?hl=ja#use-integration

実行

CSV ファイルを FTP サーバーに用意をして、実行します。
今回の記事で使用する CSV は Gemini で作成しました。実際に Application Integration 上で CSV を BigQuery テーブルに読み込むためには、スキーマを用意しておく必要があります。本記事では事前にスキーマを用意していました。

Application Integration にはスケジューラーによる実行など様々な方法があります。今回は手動実行が可能なテスト機能による実行を行いました。

https://cloud.google.com/application-integration/docs/test-publish-integrations?hl=ja
実行後、画像のようにデータ格納されていました。
複数ファイル分データが格納されているため、ループ処理がちゃんと走っていたことが分かります。
以下は Cloud Storage に連携された CSV ファイルです。

以下は BigQuery に連携されたデータです。

構築した Application Integration のサンプル

以下に Application Integration のエクスポート機能で出力した構成のサンプルを貼っておきます。
インポート機能を使って確認し、構築時の参考にしてみてください。

JSON サンプル
{
  "name": "projects/563858054132/locations/asia-northeast1/integrations/ftp-zenn/versions/2a516e74-8524-478f-8039-0d9bdb474e22",
  "description": "Zenn記事執筆検証用",
  "updateTime": "2025-05-30T03:47:54.648Z",
  "createTime": "2025-05-22T05:47:36.957Z",
  "triggerConfigs": [
    {
      "label": "Schedule Trigger",
      "startTasks": [
        {
          "taskId": "3"
        }
      ],
      "properties": {
        "Timer Name": "ftp-zenn_Timer_1",
        "Scheduled Time spec": "30 2 * * *"
      },
      "triggerType": "CRON",
      "triggerNumber": "1",
      "triggerId": "cron_trigger/ftp-zenn_Timer_1/30+2+*+*+*",
      "position": {
        "x": -304,
        "y": -279
      }
    },
    {
      "label": "API Trigger",
      "startTasks": [
        {
          "taskId": "2"
        }
      ],
      "properties": {
        "Trigger name": "ftp-zenn_API_1"
      },
      "triggerType": "API",
      "triggerNumber": "2",
      "triggerId": "api_trigger/ftp-zenn_API_1",
      "position": {
        "x": 38,
        "y": -279
      },
      "inputVariables": {
        "names": [
          "fileInformation",
          "schemas",
          "upsertQueries"
        ]
      },
      "outputVariables": {}
    }
  ],
  "taskConfigs": [
    {
      "task": "GenericConnectorTask",
      "taskId": "1",
      "parameters": {
        "listEntitiesSortByColumns": {
          "key": "listEntitiesSortByColumns",
          "value": {
            "stringValue": "$`Task_1_listEntitiesSortByColumns`$"
          }
        },
        "filterClause": {
          "key": "filterClause",
          "value": {
            "stringValue": "$`Task_1_filterClause`$"
          }
        },
        "listEntitiesPageToken": {
          "key": "listEntitiesPageToken",
          "value": {
            "stringValue": "$`Task_1_listEntitiesPageToken`$"
          }
        },
        "entityType": {
          "key": "entityType",
          "value": {
            "stringValue": "home"
          }
        },
        "listEntitiesPageSize": {
          "key": "listEntitiesPageSize",
          "value": {
            "stringValue": "$`Task_1_listEntitiesPageSize`$"
          }
        },
        "authOverrideEnabled": {
          "key": "authOverrideEnabled",
          "value": {
            "booleanValue": false
          }
        },
        "listEntitiesNextPageToken": {
          "key": "listEntitiesNextPageToken",
          "value": {
            "stringValue": "$`Task_1_listEntitiesNextPageToken`$"
          }
        },
        "connectionName": {
          "key": "connectionName",
          "value": {
            "stringValue": "projects/ca-ftp-zenn/locations/asia-northeast1/connections/ftp-connection-test"
          }
        },
        "connectorOutputPayload": {
          "key": "connectorOutputPayload",
          "value": {
            "stringValue": "$`Task_1_connectorOutputPayload`$"
          }
        },
        "operation": {
          "key": "operation",
          "value": {
            "stringValue": "LIST_ENTITIES"
          }
        },
        "connectionVersion": {
          "key": "connectionVersion",
          "value": {
            "stringValue": "projects/ca-ftp-zenn/locations/global/providers/default/connectors/ftp/versions/1"
          }
        }
      },
      "nextTasks": [
        {
          "taskId": "10"
        }
      ],
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "FTP CSVファイル名一覧取得",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": -307,
        "y": -51
      }
    },
    {
      "task": "FieldMappingTask",
      "taskId": "10",
      "parameters": {
        "FieldMappingConfigTaskParameterKey": {
          "key": "FieldMappingConfigTaskParameterKey",
          "value": {
            "jsonValue": "{\n  \"@type\": \"type.googleapis.com/enterprise.crm.eventbus.proto.FieldMappingConfig\",\n  \"mappedFields\": [{\n    \"inputField\": {\n      \"fieldType\": \"JSON_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"referenceValue\": \"$schemas$\"\n        },\n        \"transformationFunctions\": [{\n          \"functionType\": {\n            \"jsonFunction\": {\n              \"functionName\": \"SET_PROPERTY\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"referenceValue\": \"$coffeeOrderSchema$\"\n            }\n          }, {\n            \"initialValue\": {\n              \"literalValue\": {\n                \"stringValue\": \"coffee_order\"\n              }\n            }\n          }]\n        }, {\n          \"functionType\": {\n            \"jsonFunction\": {\n              \"functionName\": \"SET_PROPERTY\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"referenceValue\": \"$salesReportSchema$\"\n            }\n          }, {\n            \"initialValue\": {\n              \"literalValue\": {\n                \"stringValue\": \"sales_report\"\n              }\n            }\n          }]\n        }]\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$schemas$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }]\n}"
          }
        }
      },
      "nextTasks": [
        {
          "taskId": "5"
        }
      ],
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "Data Mapping 1",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": -311,
        "y": 61
      }
    },
    {
      "task": "FieldMappingTask",
      "taskId": "2",
      "parameters": {
        "FieldMappingConfigTaskParameterKey": {
          "key": "FieldMappingConfigTaskParameterKey",
          "value": {
            "jsonValue": "{\n  \"@type\": \"type.googleapis.com/enterprise.crm.eventbus.proto.FieldMappingConfig\",\n  \"mappedFields\": [{\n    \"inputField\": {\n      \"fieldType\": \"JSON_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"referenceValue\": \"$fileInformation$\"\n        },\n        \"transformationFunctions\": [{\n          \"functionType\": {\n            \"jsonFunction\": {\n              \"functionName\": \"GET_PROPERTY\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"literalValue\": {\n                \"stringValue\": \"FilePath\"\n              }\n            }\n          }]\n        }]\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_4_connectorInputPayload`.RemoteFile$\",\n      \"fieldType\": \"STRING_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"BOOLEAN_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"booleanValue\": true\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_4_connectorInputPayload`.HasBytes$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }]\n}"
          }
        }
      },
      "nextTasks": [
        {
          "taskId": "4"
        }
      ],
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "Data Mapping for FTP",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": 35,
        "y": -179
      }
    },
    {
      "task": "FieldMappingTask",
      "taskId": "3",
      "parameters": {
        "FieldMappingConfigTaskParameterKey": {
          "key": "FieldMappingConfigTaskParameterKey",
          "value": {
            "jsonValue": "{\n  \"@type\": \"type.googleapis.com/enterprise.crm.eventbus.proto.FieldMappingConfig\",\n  \"mappedFields\": [{\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"FilePath LIKE \\u0027/%.csv\\u0027\"\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_1_filterClause`$\",\n      \"fieldType\": \"STRING_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }]\n}"
          }
        }
      },
      "nextTasks": [
        {
          "taskId": "1"
        }
      ],
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "Data Mapping",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": -307,
        "y": -167
      }
    },
    {
      "task": "GenericConnectorTask",
      "taskId": "4",
      "parameters": {
        "connectorInputPayload": {
          "key": "connectorInputPayload",
          "value": {
            "stringValue": "$`Task_4_connectorInputPayload`$"
          }
        },
        "authOverrideEnabled": {
          "key": "authOverrideEnabled",
          "value": {
            "booleanValue": false
          }
        },
        "connectionName": {
          "key": "connectionName",
          "value": {
            "stringValue": "projects/ca-ftp-zenn/locations/asia-northeast1/connections/ftp-connection-test"
          }
        },
        "connectorOutputPayload": {
          "key": "connectorOutputPayload",
          "value": {
            "stringValue": "$`Task_4_connectorOutputPayload`$"
          }
        },
        "operation": {
          "key": "operation",
          "value": {
            "stringValue": "EXECUTE_ACTION"
          }
        },
        "connectionVersion": {
          "key": "connectionVersion",
          "value": {
            "stringValue": "projects/ca-ftp-zenn/locations/global/providers/default/connectors/ftp/versions/1"
          }
        },
        "actionName": {
          "key": "actionName",
          "value": {
            "stringValue": "Download"
          }
        }
      },
      "nextTasks": [
        {
          "taskId": "6"
        }
      ],
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "CSVファイルダウンロード",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": 36,
        "y": -66
      }
    },
    {
      "task": "SubWorkflowForEachLoopV2Task",
      "taskId": "5",
      "parameters": {
        "iterationElementMapping": {
          "key": "iterationElementMapping",
          "value": {
            "stringValue": "fileInformation"
          }
        },
        "triggerId": {
          "key": "triggerId",
          "value": {
            "stringValue": "api_trigger/ftp-zenn_API_1"
          }
        },
        "aggregatorParameterMapping": {
          "key": "aggregatorParameterMapping"
        },
        "loopMetadata": {
          "key": "loopMetadata",
          "value": {
            "stringArray": {
              "stringValues": [
                "$`Task_5_loopMetadata`$"
              ]
            }
          }
        },
        "disableEucPropagation": {
          "key": "disableEucPropagation",
          "value": {
            "booleanValue": false
          }
        },
        "listToIterate": {
          "key": "listToIterate",
          "value": {
            "stringValue": "$`Task_1_connectorOutputPayload`$"
          }
        },
        "workflowName": {
          "key": "workflowName",
          "value": {
            "stringValue": "ftp-zenn"
          }
        },
        "requestParameterMapping": {
          "key": "requestParameterMapping",
          "value": {
            "jsonValue": "{\n  \"@type\": \"type.googleapis.com/enterprise.crm.eventbus.proto.ParameterMap\",\n  \"entries\": [{\n    \"key\": {\n      \"literalValue\": {\n        \"stringValue\": \"schemas\"\n      }\n    },\n    \"value\": {\n      \"literalValue\": {\n        \"stringValue\": \"schemas\"\n      }\n    }\n  }]\n}"
          }
        },
        "overrideParameterMapping": {
          "key": "overrideParameterMapping"
        }
      },
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "For Each Loop",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": -310,
        "y": 169
      }
    },
    {
      "task": "FieldMappingTask",
      "taskId": "6",
      "parameters": {
        "FieldMappingConfigTaskParameterKey": {
          "key": "FieldMappingConfigTaskParameterKey",
          "value": {
            "jsonValue": "{\n  \"@type\": \"type.googleapis.com/enterprise.crm.eventbus.proto.FieldMappingConfig\",\n  \"mappedFields\": [{\n    \"inputField\": {\n      \"fieldType\": \"JSON_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"referenceValue\": \"$`Task_4_connectorOutputPayload`$\"\n        },\n        \"transformationFunctions\": [{\n          \"functionType\": {\n            \"jsonFunction\": {\n              \"functionName\": \"GET_ELEMENT\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"literalValue\": {\n                \"intValue\": \"1\"\n              }\n            }\n          }]\n        }, {\n          \"functionType\": {\n            \"jsonFunction\": {\n              \"functionName\": \"GET_PROPERTY\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"literalValue\": {\n                \"stringValue\": \"ContentBytes\"\n              }\n            }\n          }]\n        }]\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_7_connectorInputPayload`.ContentBytes$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"JSON_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"referenceValue\": \"$fileInformation$\"\n        },\n        \"transformationFunctions\": [{\n          \"functionType\": {\n            \"jsonFunction\": {\n              \"functionName\": \"GET_PROPERTY\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"literalValue\": {\n                \"stringValue\": \"Filename\"\n              }\n            }\n          }]\n        }]\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$fileName$\",\n      \"fieldType\": \"STRING_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"referenceValue\": \"$fileName$\"\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_7_connectorInputPayload`.ObjectName$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"BOOLEAN_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"booleanValue\": true\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_7_connectorInputPayload`.HasBytes$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"ftp-sales-data\"\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_7_connectorInputPayload`.Bucket$\",\n      \"fieldType\": \"STRING_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }]\n}"
          }
        }
      },
      "nextTasks": [
        {
          "taskId": "7"
        }
      ],
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "CSV to Cloud Storage",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": 34,
        "y": 51
      }
    },
    {
      "task": "GenericConnectorTask",
      "taskId": "7",
      "parameters": {
        "connectorInputPayload": {
          "key": "connectorInputPayload",
          "value": {
            "stringValue": "$`Task_7_connectorInputPayload`$"
          }
        },
        "authOverrideEnabled": {
          "key": "authOverrideEnabled",
          "value": {
            "booleanValue": false
          }
        },
        "connectionName": {
          "key": "connectionName",
          "value": {
            "stringValue": "projects/ca-ftp-zenn/locations/asia-northeast1/connections/gcs-connector"
          }
        },
        "connectorOutputPayload": {
          "key": "connectorOutputPayload",
          "value": {
            "stringValue": "$`Task_7_connectorOutputPayload`$"
          }
        },
        "operation": {
          "key": "operation",
          "value": {
            "stringValue": "EXECUTE_ACTION"
          }
        },
        "connectionVersion": {
          "key": "connectionVersion",
          "value": {
            "stringValue": "projects/ca-ftp-zenn/locations/global/providers/gcp/connectors/gcs/versions/1"
          }
        },
        "actionName": {
          "key": "actionName",
          "value": {
            "stringValue": "UploadObject"
          }
        }
      },
      "nextTasks": [
        {
          "taskId": "8"
        }
      ],
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "Cloud Storage",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": 35,
        "y": 148
      }
    },
    {
      "task": "FieldMappingTask",
      "taskId": "8",
      "parameters": {
        "FieldMappingConfigTaskParameterKey": {
          "key": "FieldMappingConfigTaskParameterKey",
          "value": {
            "jsonValue": "{\n  \"@type\": \"type.googleapis.com/enterprise.crm.eventbus.proto.FieldMappingConfig\",\n  \"mappedFields\": [{\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"referenceValue\": \"$fileName$\"\n        },\n        \"transformationFunctions\": [{\n          \"functionType\": {\n            \"stringFunction\": {\n              \"functionName\": \"SPLIT\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"literalValue\": {\n                \"stringValue\": \".\"\n              }\n            }\n          }]\n        }, {\n          \"functionType\": {\n            \"stringArrayFunction\": {\n              \"functionName\": \"GET\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"literalValue\": {\n                \"intValue\": \"0\"\n              }\n            }\n          }]\n        }]\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$fileType$\",\n      \"fieldType\": \"STRING_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"gs://ftp-sales-data/\"\n          }\n        },\n        \"transformationFunctions\": [{\n          \"functionType\": {\n            \"stringFunction\": {\n              \"functionName\": \"CONCAT\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"referenceValue\": \"$fileName$\"\n            }\n          }]\n        }]\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_9_connectorInputPayload`.SourceURIs$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"ca-ftp-zenn.\"\n          }\n        },\n        \"transformationFunctions\": [{\n          \"functionType\": {\n            \"stringFunction\": {\n              \"functionName\": \"CONCAT\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"literalValue\": {\n                \"stringValue\": \"test_sales_data.\"\n              }\n            }\n          }]\n        }, {\n          \"functionType\": {\n            \"stringFunction\": {\n              \"functionName\": \"CONCAT\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"referenceValue\": \"$fileType$\"\n            }\n          }]\n        }]\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_9_connectorInputPayload`.DestinationTable$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"CSV\"\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_9_connectorInputPayload`.SourceFormat$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"1\"\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_9_connectorInputPayload`.CSVSkipLeadingRows$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"false\"\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_9_connectorInputPayload`.Autodetect$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"WRITE_TRUNCATE\"\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_9_connectorInputPayload`.WriteDisposition$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"referenceValue\": \"$schemas$\"\n        },\n        \"transformationFunctions\": [{\n          \"functionType\": {\n            \"jsonFunction\": {\n              \"functionName\": \"GET_PROPERTY\"\n            }\n          },\n          \"parameters\": [{\n            \"initialValue\": {\n              \"referenceValue\": \"$fileType$\"\n            }\n          }]\n        }, {\n          \"functionType\": {\n            \"jsonFunction\": {\n              \"functionName\": \"TO_STRING\"\n            }\n          }\n        }]\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_9_connectorInputPayload`.DestinationTableSchema$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }, {\n    \"inputField\": {\n      \"fieldType\": \"STRING_VALUE\",\n      \"transformExpression\": {\n        \"initialValue\": {\n          \"literalValue\": {\n            \"stringValue\": \"true\"\n          }\n        }\n      }\n    },\n    \"outputField\": {\n      \"referenceKey\": \"$`Task_9_connectorInputPayload`.CSVAllowQuotedNewlines$\",\n      \"fieldType\": \"JSON_VALUE\",\n      \"cardinality\": \"OPTIONAL\"\n    }\n  }]\n}"
          }
        }
      },
      "nextTasks": [
        {
          "taskId": "9"
        }
      ],
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "Cloud Storage to BigQuery table",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": 39,
        "y": 261
      }
    },
    {
      "task": "GenericConnectorTask",
      "taskId": "9",
      "parameters": {
        "connectorInputPayload": {
          "key": "connectorInputPayload",
          "value": {
            "stringValue": "$`Task_9_connectorInputPayload`$"
          }
        },
        "authOverrideEnabled": {
          "key": "authOverrideEnabled",
          "value": {
            "booleanValue": false
          }
        },
        "connectionName": {
          "key": "connectionName",
          "value": {
            "stringValue": "projects/ca-ftp-zenn/locations/asia-northeast1/connections/bq-connector"
          }
        },
        "connectorOutputPayload": {
          "key": "connectorOutputPayload",
          "value": {
            "stringValue": "$`Task_9_connectorOutputPayload`$"
          }
        },
        "operation": {
          "key": "operation",
          "value": {
            "stringValue": "EXECUTE_ACTION"
          }
        },
        "connectionVersion": {
          "key": "connectionVersion",
          "value": {
            "stringValue": "projects/ca-ftp-zenn/locations/global/providers/gcp/connectors/bigquery/versions/1"
          }
        },
        "actionName": {
          "key": "actionName",
          "value": {
            "stringValue": "InsertLoadJob"
          }
        }
      },
      "taskExecutionStrategy": "WHEN_ALL_SUCCEED",
      "displayName": "BigQueryテーブルインサート",
      "externalTaskType": "NORMAL_TASK",
      "position": {
        "x": 39,
        "y": 365
      }
    }
  ],
  "integrationParameters": [
    {
      "key": "`ExecutionMode`",
      "dataType": "STRING_VALUE",
      "defaultValue": {
        "stringValue": ""
      },
      "displayName": "`ExecutionMode`",
      "isTransient": true
    },
    {
      "key": "`ErrorInfo`",
      "dataType": "JSON_VALUE",
      "defaultValue": {
        "jsonValue": "{\n  \"message\": \"\",\n  \"code\": 0.0\n}"
      },
      "displayName": "`ErrorInfo`",
      "isTransient": true,
      "jsonSchema": "{\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n  \"type\": \"object\",\n  \"properties\": {\n    \"code\": {\n      \"type\": \"number\"\n    },\n    \"message\": {\n      \"type\": \"string\"\n    }\n  }\n}"
    },
    {
      "key": "`ExecutionId`",
      "dataType": "STRING_VALUE",
      "defaultValue": {
        "stringValue": ""
      },
      "displayName": "`ExecutionId`",
      "isTransient": true
    },
    {
      "key": "`IntegrationName`",
      "dataType": "STRING_VALUE",
      "defaultValue": {
        "stringValue": ""
      },
      "displayName": "`IntegrationName`",
      "isTransient": true
    },
    {
      "key": "`Region`",
      "dataType": "STRING_VALUE",
      "defaultValue": {
        "stringValue": ""
      },
      "displayName": "`Region`",
      "isTransient": true
    },
    {
      "key": "`ProjectId`",
      "dataType": "STRING_VALUE",
      "defaultValue": {
        "stringValue": ""
      },
      "displayName": "`ProjectId`",
      "isTransient": true
    },
    {
      "key": "test",
      "dataType": "STRING_VALUE",
      "displayName": "test"
    },
    {
      "key": "`Task_1_listEntitiesPageSize`",
      "dataType": "INT_VALUE",
      "displayName": "`Task_1_listEntitiesPageSize`",
      "producer": "1_1"
    },
    {
      "key": "`Task_1_listEntitiesPageToken`",
      "dataType": "STRING_VALUE",
      "displayName": "`Task_1_listEntitiesPageToken`",
      "producer": "1_1"
    },
    {
      "key": "`Task_1_listEntitiesSortByColumns`",
      "dataType": "STRING_ARRAY",
      "displayName": "`Task_1_listEntitiesSortByColumns`",
      "producer": "1_1"
    },
    {
      "key": "`Task_1_filterClause`",
      "dataType": "STRING_VALUE",
      "displayName": "`Task_1_filterClause`",
      "producer": "1_1"
    },
    {
      "key": "`Task_1_connectorOutputPayload`",
      "dataType": "JSON_VALUE",
      "displayName": "`Task_1_connectorOutputPayload`",
      "isTransient": true,
      "producer": "1_1",
      "jsonSchema": "{\n  \"type\": [\"array\"],\n  \"items\": {\n    \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n    \"type\": \"object\",\n    \"properties\": {\n      \"LocalFile\": {\n        \"type\": [\"string\", \"null\"]\n      },\n      \"LastModified\": {\n        \"type\": [\"string\", \"null\"]\n      },\n      \"IsDirectory\": {\n        \"type\": [\"boolean\", \"null\"]\n      },\n      \"Owner\": {\n        \"type\": [\"string\", \"null\"]\n      },\n      \"Filename\": {\n        \"type\": [\"string\", \"null\"],\n        \"description\": \"The name of the file or directory.\"\n      },\n      \"FileSize\": {\n        \"type\": [\"integer\", \"null\"],\n        \"description\": \"The size of the file.\"\n      },\n      \"Group\": {\n        \"type\": [\"string\", \"null\"]\n      },\n      \"FilePath\": {\n        \"type\": \"string\"\n      },\n      \"Permissions\": {\n        \"type\": [\"string\", \"null\"]\n      }\n    }\n  },\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\"\n}"
    },
    {
      "key": "`Task_1_listEntitiesNextPageToken`",
      "dataType": "STRING_VALUE",
      "displayName": "`Task_1_listEntitiesNextPageToken`",
      "isTransient": true,
      "producer": "1_1"
    },
    {
      "key": "`Task_4_connectorInputPayload`",
      "dataType": "JSON_VALUE",
      "displayName": "`Task_4_connectorInputPayload`",
      "producer": "1_4",
      "jsonSchema": "{\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n  \"type\": \"object\",\n  \"properties\": {\n    \"RemoteFile\": {\n      \"type\": \"string\",\n      \"description\": \"The file name on the remote host.\"\n    },\n    \"HasBytes\": {\n      \"type\": [\"boolean\", \"null\"],\n      \"description\": \"Whether to download content as bytes.\"\n    }\n  },\n  \"required\": [\"RemoteFile\"]\n}"
    },
    {
      "key": "`Task_4_connectorOutputPayload`",
      "dataType": "JSON_VALUE",
      "displayName": "`Task_4_connectorOutputPayload`",
      "isTransient": true,
      "producer": "1_4",
      "jsonSchema": "{\n  \"type\": \"array\",\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n  \"items\": {\n    \"type\": \"object\",\n    \"properties\": {\n    },\n    \"$schema\": \"http://json-schema.org/draft-07/schema#\"\n  }\n}"
    },
    {
      "key": "`Task_5_loopMetadata`",
      "dataType": "JSON_VALUE",
      "displayName": "`Task_5_loopMetadata`",
      "isTransient": true,
      "producer": "1_5",
      "jsonSchema": "{\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n  \"type\": \"object\",\n  \"properties\": {\n    \"sub_integration_execution_ids\": {\n      \"type\": \"array\",\n      \"items\": {\n        \"type\": \"string\"\n      }\n    },\n    \"current_iteration_count\": {\n      \"type\": \"number\"\n    },\n    \"failure_message\": {\n      \"type\": \"string\"\n    },\n    \"failure_location\": {\n      \"type\": \"string\"\n    },\n    \"current_element\": {\n      \"type\": [\"string\", \"number\", \"object\", \"array\"]\n    }\n  }\n}"
    },
    {
      "key": "fileInformation",
      "dataType": "JSON_VALUE",
      "displayName": "fileInformation",
      "inputOutputType": "IN"
    },
    {
      "key": "`Task_7_connectorInputPayload`",
      "dataType": "JSON_VALUE",
      "displayName": "`Task_7_connectorInputPayload`",
      "producer": "1_7",
      "jsonSchema": "{\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n  \"type\": \"object\",\n  \"properties\": {\n    \"FolderPath\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The path to the folder that will receive the data of the object.\"\n    },\n    \"Content\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The Content to be uploaded in the bucket\"\n    },\n    \"ObjectName\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The name of the uploaded object. It should be specified only when uploading the content as InputStream.\"\n    },\n    \"ContentBytes\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Bytes content to upload as file.\"\n    },\n    \"ContentType\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Content-Type of the object data.\",\n      \"default\": \"application/x-www-form-urlencoded\"\n    },\n    \"HasBytes\": {\n      \"type\": [\"boolean\", \"null\"],\n      \"description\": \"Whether to upload content as bytes.\",\n      \"default\": false\n    },\n    \"Bucket\": {\n      \"type\": \"string\",\n      \"description\": \"Bucket name where the object will be uploaded.\"\n    }\n  },\n  \"required\": [\"Bucket\"]\n}"
    },
    {
      "key": "`Task_7_connectorOutputPayload`",
      "dataType": "JSON_VALUE",
      "displayName": "`Task_7_connectorOutputPayload`",
      "isTransient": true,
      "producer": "1_7",
      "jsonSchema": "{\n  \"type\": \"array\",\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n  \"items\": {\n    \"type\": \"object\",\n    \"properties\": {\n    },\n    \"$schema\": \"http://json-schema.org/draft-07/schema#\"\n  }\n}"
    },
    {
      "key": "fileName",
      "dataType": "STRING_VALUE",
      "displayName": "fileName"
    },
    {
      "key": "`Task_9_connectorInputPayload`",
      "dataType": "JSON_VALUE",
      "displayName": "`Task_9_connectorInputPayload`",
      "producer": "1_9",
      "jsonSchema": "{\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n  \"type\": \"object\",\n  \"properties\": {\n    \"RangePartitioning\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON object giving the range partitioning field and buckets.\",\n      \"properties\": {\n      }\n    },\n    \"DecimalTargetTypes\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON list giving the preference order applied to numeric types.\",\n      \"properties\": {\n      }\n    },\n    \"ParquetOptions\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON object giving the Parquet-specific import options.\",\n      \"properties\": {\n      }\n    },\n    \"DestinationTableProperties\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON object containing the table friendlyName, description and list of labels.\",\n      \"properties\": {\n      }\n    },\n    \"DestinationTableSchema\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON list contianing the fields used to create the table.\",\n      \"properties\": {\n      }\n    },\n    \"DryRun\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Whether or not this is a dry run of the job.\",\n      \"properties\": {\n      }\n    },\n    \"Clustering\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON object giving the fields to be used for clustering.\",\n      \"properties\": {\n      }\n    },\n    \"DestinationEncryptionConfiguration\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON object giving the KMS encryption settings for the table.\",\n      \"properties\": {\n      }\n    },\n    \"DestinationTable\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The destination table for the query, in the format DestProjectId.DestDatasetId.DestTable\",\n      \"properties\": {\n      }\n    },\n    \"CSVAllowJaggedRows\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Whether lines in CSV files may contain missing fields. False by default\",\n      \"properties\": {\n      }\n    },\n    \"CSVSkipLeadingRows\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"How many rows to skip at the start of CSV files. Usually used for skipping header rows.\",\n      \"properties\": {\n      }\n    },\n    \"IgnoreUnknownValues\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Whether to ignore unknown fields in the input file or treat them as errors. By default they are treated as errors.\",\n      \"properties\": {\n      }\n    },\n    \"CSVEncoding\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The name of the encoding used for CSV files.\",\n      \"properties\": {\n      }\n    },\n    \"HivePartitioningOptions\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON object giving the source-side partitioning options.\",\n      \"properties\": {\n      }\n    },\n    \"CreateDisposition\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Whether to create the destination table if it does not exist.\",\n      \"properties\": {\n      }\n    },\n    \"SourceURIs\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A space-separated list of Google Cloud Storage URIs\",\n      \"properties\": {\n      }\n    },\n    \"AvroUseLogicalTypes\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Whether to use Avro logical types when converting Avro data into BigQuery types.\",\n      \"properties\": {\n      }\n    },\n    \"Region\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The region to start executing the job in. Both the GCS resources and the BigQuery dataset must be in the same region.\",\n      \"properties\": {\n      }\n    },\n    \"CSVAllowQuotedNewlines\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Whether CSV files can contain newlines within quoted fields.\",\n      \"properties\": {\n      }\n    },\n    \"CSVQuote\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The character used for quoted fields in CSV files. May be set to empty to disable quoting.\",\n      \"properties\": {\n      }\n    },\n    \"SchemaUpdateOptions\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON list giving the options to apply when updating the destination table schema.\",\n      \"properties\": {\n      }\n    },\n    \"CSVFieldDelimiter\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The character used to separate columns within CSV files.\",\n      \"properties\": {\n      }\n    },\n    \"DSBackupProjectionFields\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON list of fields to load from a Cloud datastore backup.\",\n      \"properties\": {\n      }\n    },\n    \"CSVNullMarker\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"If provided, this string is used for NULL values within CSV files. By default CSV files cannot use NULL.\",\n      \"properties\": {\n      }\n    },\n    \"WriteDisposition\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"How to write data to the destination table, such as truncate existing results, appending existing results, or writing only when the table is empty.\",\n      \"properties\": {\n      }\n    },\n    \"SourceFormat\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"The source format that the files are formatted in.\",\n      \"properties\": {\n      }\n    },\n    \"Autodetect\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"Whether options and schema should be automatically determined for JSON and CSV files.\",\n      \"properties\": {\n      }\n    },\n    \"MaximumBadRecords\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"If provided, the number of records that can be invalid before the entire job is canceled. By default all records must be valid.\",\n      \"properties\": {\n      }\n    },\n    \"TimePartitioning\": {\n      \"type\": [\"string\", \"null\"],\n      \"description\": \"A JSON object giving the time partitioning type and field.\",\n      \"properties\": {\n      }\n    }\n  }\n}"
    },
    {
      "key": "`Task_9_connectorOutputPayload`",
      "dataType": "JSON_VALUE",
      "displayName": "`Task_9_connectorOutputPayload`",
      "isTransient": true,
      "producer": "1_9",
      "jsonSchema": "{\n  \"type\": \"array\",\n  \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n  \"items\": {\n    \"type\": \"object\",\n    \"properties\": {\n    },\n    \"$schema\": \"http://json-schema.org/draft-07/schema#\"\n  }\n}"
    },
    {
      "key": "schemas",
      "dataType": "JSON_VALUE",
      "displayName": "schemas",
      "inputOutputType": "IN"
    },
    {
      "key": "fileType",
      "dataType": "STRING_VALUE",
      "displayName": "fileType"
    },
    {
      "key": "coffeeOrderSchema",
      "dataType": "STRING_VALUE",
      "defaultValue": {
        "stringValue": "{\t\"fields\":\n[\n\n\t{\n\t\t\"name\": \"order_id\",\n\t\t\"type\": \"INTEGER\",\n\t\t\"mode\": \"NULLABLE\",\n\t\t\"description\": \"注文を一意に識別するID\"\n\t},\n\t{\n\t\t\"name\": \"customer_name\",\n\t\t\"type\": \"STRING\",\n\t\t\"mode\": \"NULLABLE\",\n\t\t\"description\": \"顧客の名前\"\n\t},\n\t{\n\t\t\"name\": \"drink\",\n\t\t\"type\": \"STRING\",\n\t\t\"mode\": \"NULLABLE\",\n\t\t\"description\": \"注文された飲み物の種類\"\n\t},\n\t{\n\t\t\"name\": \"size\",\n\t\t\"type\": \"STRING\",\n\t\t\"mode\": \"NULLABLE\",\n\t\t\"description\": \"飲み物のサイズ (例: S, M, L)\"\n\t},\n\t{\n\t\t\"name\": \"special_request\",\n\t\t\"type\": \"STRING\",\n\t\t\"mode\": \"NULLABLE\",\n\t\t\"description\": \"顧客からの特別なリクエスト\"\n\t},\n\t{\n\t\t\"name\": \"barista_notes\",\n\t\t\"type\": \"STRING\",\n\t\t\"mode\": \"NULLABLE\",\n\t\t\"description\": \"バリスタが記録したメモ\"\n\t},\n\t{\n\t\t\"name\": \"order_timestamp\",\n\t\t\"type\": \"DATETIME\",\n\t\t\"mode\": \"NULLABLE\",\n\t\t\"description\": \"注文が行われた日時\"\n\t}\n]\n}\n"
      },
      "displayName": "coffeeOrderSchema"
    },
    {
      "key": "salesReportSchema",
      "dataType": "STRING_VALUE",
      "defaultValue": {
        "stringValue": "{\n\t\"fields\": [\n\t\t{\n\t\t\t\"name\": \"report_date\",\n\t\t\t\"type\": \"DATE\",\n\t\t\t\"mode\": \"NULLABLE\",\n\t\t\t\"description\": \"レポートの日付\"\n\t\t},\n\t\t{\n\t\t\t\"name\": \"store_id\",\n\t\t\t\"type\": \"STRING\",\n\t\t\t\"mode\": \"NULLABLE\",\n\t\t\t\"description\": \"店舗ID\"\n\t\t},\n\t\t{\n\t\t\t\"name\": \"item_code\",\n\t\t\t\"type\": \"STRING\",\n\t\t\t\"mode\": \"NULLABLE\",\n\t\t\t\"description\": \"商品コード\"\n\t\t},\n\t\t{\n\t\t\t\"name\": \"sales_quantity\",\n\t\t\t\"type\": \"INTEGER\",\n\t\t\t\"mode\": \"NULLABLE\",\n\t\t\t\"description\": \"販売数量\"\n\t\t},\n\t\t{\n\t\t\t\"name\": \"revenue\",\n\t\t\t\"type\": \"BIGNUMERIC\",\n\t\t\t\"mode\": \"NULLABLE\",\n\t\t\t\"description\": \"売上金額\"\n\t\t}\n\t]\n}\n"
      },
      "displayName": "salesReportSchema"
    },
    {
      "key": "upsertQueries",
      "dataType": "JSON_VALUE",
      "displayName": "upsertQueries",
      "inputOutputType": "IN"
    }
  ],
  "databasePersistencePolicy": "DATABASE_PERSISTENCE_ASYNC"
}

画像の三点リーダーをクリックした際に出てくる Upload Integration からインポートできます。

まとめ

今回は Application Integration で FTP サーバーから Cloud Storage を経由して BigQuery までのデータ連携を実行しました。

Application Integration は GUI ベースでデータ連携を直感的に操作できるツールですが、データそのものの加工には向いていない側面もあると感じました。
もしデータ加工を行いたいのであれば、BigQuery の一時テーブルを活用する方法があります。
具体的には、直接本テーブルに格納するのではなく、まず BigQuery の一時テーブルに生データを格納します。その後、BigQueryの強力な SQL クエリ機能(例えば、CREATE TABLE AS SELECT や MERGE といったステートメントなど)を利用して、一時テーブルに格納されたデータを整形・加工し、最終的に本テーブルへ INSERT または MERGE する方法です。
この方法を利用することで Application Integration のデータ処理はシンプルにしつつ、BigQuery のクエリを利用して複雑なデータの加工ロジックを分離させることができます。
これはあくまで一例ですので、他の方法も模索してみてください。

Application Integration を構築する際、この記事が参考になれば幸いです。

Discussion