Anazon AppFlow を用いて Amazon S3 バケット上の CSV/JSON の中身をTiDB Serverlessに書き込む

2024/09/26に公開

https://docs.pingcap.com/ja/tidbcloud/dev-guide-aws-appflow-integration
PingCapがTiDB とAmazon AppFlowを統合して様々なデータソースと連携し、その中身をバッチでTiDBに書き込むサンプルを提供してくれています。

上記ドキュンメント上はデータソースにSalesforceをもちいていますが、ちょっとしたお勉強目的でSalesforceアカウントを準備するのはなかなか大変です。ということで少し改造しAmazon S3の中に格納されているCSV/JSONファイルの中身をTiDBに書き込む手順を纏めます。

1. 環境整備

AWS CloudShellを使います
PingCapがTiDBへデータを書き込むためのAppFlowと連携する専用のLambda関数をJavaで提供してくれているため、mvnをインストールします。

sudo yum update -y
sudo yum install maven -y

つぎにGitから提供されているサンプルレポジトリをクローンします。

git clone https://github.com/pingcap-inc/tidb-appflow-integration

2. AWS SAM/CloudFormationを使ってLambda 関数のアップロード

cd tidb-appflow-integration
mvn clean package

BUILD SUCCESSと表示されれば次に進みます。targetフォルダの中にtest-appflow-and-tidb-1.0.jarというLambda関数用パッケージが出来ていますのでこれをアップロードします。

sam deploy --guided

y/nで聞かれるものには全てy、入力を求められるものには全てそのままEnterでデフォルトで作業を進めます。
2つのCloudFormationスタックが作成されるので完成まで待ちます。

完成するとLambda関数が一つ作成されています。

この関数がAppFlowと連携してデータをTiDBに書き込みます。

3. Amazon S3 バケットの設定とCSVファイルのアップロード

このシナリオではCSVを使ってみます。

Column1,Column2
Row1-Col1,Row1-Col2
Row2-Col1,Row2-Col2
Row3-Col1,Row3-Col2
Row4-Col1,Row4-Col2

このCSVファイルを適当な名前で作成し保存します。
次にこれを適当なS3バケットのappflowフォルダに保存します。拡張子は.csvにしておきます。AppFlowはバケット直下のファイルは指定できないようです。

4. データ書き込み用テーブルをTiDBで作成

USE test;

CREATE TABLE appflow (
    column1 VARCHAR(255) NOT NULL,
    column2 VARCHAR(255) NOT NULL
);

5. AppFlow の設定

コネクタの登録

まずはデータターゲットとなるTiDBをコネクタとして登録します。実際はLambda関数経由での書き込みとなるため、登録すべきはLambda関数です。新しいコネクタを登録をクリックします。

先ほど作成されたLambda関数を選択して登録をクリックします。

接続の作成

次に先ほどテーブルを作成したTiDB上のデータベースへの接続を登録します。TiDBはMySQL互換ですので、MySQLドライバーを用います。接続を作成をクリックします。

必要なパラメータを入力して接続するをクリックします。TLSの使用は必須です。

フローの作成

先ほど作成したコネクタと接続を使用してTiDBのテーブルとの連携がセットできていますので、Amazon S3からデータを流すフローを作成します。
フローを作成をクリックします。

適当な名前を入力して次へをクリックします。送信もとにCSVが保存されているS3バケットを選択し、CSVが格納されているフォルダを指定します。前述の通りバケット直下は指定不可ですので気を付けてください。

コネクタ接続を指定します。

ここまでの設定があっていればオブジェクトのドロップダウンにTiDBのテーブルが表示されますので先ほど作成したappflowを指定します。あとはデフォルトのまま次へをクリックします。

S3バケット内のCSVのカラムを認識していますのでそれを書き込むTiDBテーブルのカラムにマッピングします。(マッピングフィールドボタンを押すと登録されます)


次へを2回押してフローを作成をクリックするとフローが作成されます。

実行

では作成されたフローを実行します。

データが問題なく書き込まれています。

Discussion