📈

AWS Data Pipelineを使ってRDS MySQLのデータを定期的にS3に出力する

2020/12/31に公開

概要

AWS RDSのMySQLに保存されているデータをcsvで日次で集計・出力して利用したいなーと思うことがあったので、どのような構成で出力するのがベターかを比較検討し、最終的にAWS Data Pipelineを選択しました。そして利用したメリット・デメリットを記載します。

要件

AWS RDS MySQLのあるテーブル(ここではhogeとします)の昨日インサートされたレコードを以下のようなフォーマットで毎朝7:00にs3に出力したい

SELECT
  *
FROM
  hoge
WHERE created_at BETWEEN '2020-12-01 00:00:00' AND '2020-12-01 23:59:59'
id huga piyo created_at
1 aaa 123 2020-12-01 00:00:00
2 bbb 456 2020-12-01 00:00:01
s3://sample_bucket/hoge/2020-12-01.csv

なぜData Pipelineを選んだか?

調査を行う過程で、AWS GlueとAWS Data Pipelineが候補に上がりました。

https://aws.amazon.com/jp/glue/faqs/

Q: AWS Glue とAWS Data Pipeline はどのように使い分けますか?

AWS Glue では、Apache Spark サーバーレス環境で運用されるマネージド型 ETL サービスが提供されています。お客様は ETL ジョブに集中でき、基盤となるコンピューティングリソースの設定や管理について心配する必要がありません。AWS Glue ではデータ優先のアプローチが採用されており、ビジネスインサイトを引き出すことができる形式にデータを変換するための、データのプロパティと操作に集中できます。データカタログが統合され、ETL に加えて Amazon Athena や Amazon Redshift Spectrum でのクエリにもメタデータを利用できるようになります。

AWS Data Pipeline はマネージド型のオーケストレーションサービスで、実行環境、コードを実行するコンピューティングリソースに対するアクセスと制御、およびデータ処理のコード自体について、より大きな柔軟性を備えています。AWS Data Pipeline ではお客様のアカウント内でコンピューティングリソースが起動され、Amazon EC2 インスタンスや Amazon EMR クラスターに直接アクセスできるようになります。

さらに、AWS Glue の ETL ジョブは、Scala または Python をベースとしています。Apache Spark 以外のエンジンの使用が必要なユースケースや、Hive や Pig などさまざまなエンジンで実行される複数の異種ジョブを実行する場合は、AWS Data Pipeline をお勧めします。

こちらを読み進めていくと、「Glueの方が楽そうかな・・・?」と感じ、実際にGlueを触ってみると、

  • 前提としてApache Sparkの知識が必要で、データカタログやクローラーの作成をしたりさまざまな前処理が絡んでくること
  • Apache Sparkが分散処理を実現しているから・・?なのか出力したcsvが分割され、それをひとまとめにするのに調整が必要になった(一応実現はできましたが・・・
  • Data Pipelineと比較すると料金が高い
    • クローラーの実行には複数のDPUが使用され、最小単位10分の利用で1時間あたり1DPU 0.44USD

といった感じで、もちろん知識不足もあるのですが、今回の要件に対して非常にオーバースペックな印象を受けました。そのためData Pipelineを利用したところ、要件に合うベターな構成になったかな?と思ったので紹介します。

実践してみた

https://docs.aws.amazon.com/ja_jp/datapipeline/latest/DeveloperGuide/what-is-datapipeline.html

構成

構成と大まかな流れとしては、MySQLサーバが配置されているサブネットにEC2を定期的に配置して、そのインスタンスがSQLを実行しcsvを生成、S3に出力します。

作った構成をJSONでエクスポートするとこんな感じ

{
  "objects": [
    {
      "filePath": "#{myOutputS3Loc}#{myRDSTableName}/#{@scheduledStartTime}.csv",
      "name": "S3OutputLocation",
      "id": "S3OutputLocation",
      "type": "S3DataNode"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{myOutputS3Loc}log",
      "scheduleType": "cron",
      "name": "Default",
      "id": "Default"
    },
    {
      "databaseName": "#{myRDSDatabaseName}",
      "*password": "#{*myRDSPassword}",
      "name": "rds_mysql",
      "jdbcProperties": "allowMultiQueries=true",
      "id": "rds_mysql",
      "type": "RdsDatabase",
      "rdsInstanceId": "#{myRDSInstanceId}",
      "username": "#{myRDSUsername}"
    },
    {
      "subnetId": "#{mySubnetId}",
      "instanceType": "#{myEC2InstanceType}",
      "name": "Ec2Instance",
      "actionOnTaskFailure": "terminate",
      "id": "Ec2Instance",
      "type": "Ec2Resource",
      "terminateAfter": "2 Hours"
    },
    {
      "output": {
        "ref": "S3OutputLocation"
      },
      "input": {
        "ref": "SourceRDSTable"
      },
      "name": "RDStoS3CopyActivity",
      "runsOn": {
        "ref": "Ec2Instance"
      },
      "id": "RDStoS3CopyActivity",
      "type": "CopyActivity"
    },
    {
      "database": {
        "ref": "rds_mysql"
      },
      "name": "SourceRDSTable",
      "id": "SourceRDSTable",
      "type": "SqlDataNode",
      "selectQuery": "#{myQuery}",
      "table": "#{myRDSTableName}"
    },
    {
      "period": "1 days",
      "startDateTime": "2020-12-01T22:00:00",
      "name": "Every 1 day",
      "id": "DefaultSchedule",
      "type": "Schedule"
    }
  ],
  "parameters": [
    {
      "description": "RDS MySQL password",
      "id": "*myRDSPassword",
      "type": "String"
    },
    {
      "description": "Output S3 folder",
      "id": "myOutputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "RDS MySQL username",
      "id": "myRDSUsername",
      "type": "String"
    },
    {
      "helpText": "The name of an existing table or a new table that will be created based on the Create table SQL query parameter below.",
      "description": "RDS MySQL table name",
      "id": "myRDSTableName",
      "type": "String"
    },
    {
      "default": "t2.micro",
      "helpText": "The type of the EC2 instance that will be launched on your behalf to do the copy",
      "description": "EC2 instance type",
      "id": "myEC2InstanceType",
      "type": "String"
    },
    {
      "watermark": "DB Instance",
      "description": "RDS Instance ID",
      "id": "myRDSInstanceId",
      "type": "String"
    },
    {
      "watermark": "Database Name",
      "description": "Database Name",
      "id": "myRDSDatabaseName",
      "type": "String"
    },
    {
      "watermark": "Subnet ID",
      "description": "Subnet ID",
      "id": "mySubnetId",
      "type": "String"
    },
    {
      "watermark": "myQuery",
      "description": "myQuery",
      "id": "myQuery",
      "type": "String"
    }
  ],
  "values": {
    "mySubnetId": "hoge_subnet_id",
    "myRDSInstanceId": "hoge_instance_id",
    "myRDSUsername": "hoge_user",
    "myRDSDatabaseName": "hoge_database"
    "myEC2InstanceType": "t2.micro",
    "myOutputS3Loc": "s3://sample_bucket/",
    "*myRDSPassword": "hoge_password",
    "myRDSTableName": "hoge",
    "myQuery": "SELECT * FROM hoge WHERE created_at BETWEEN CONCAT(DATE_SUB(CURRENT_DATE, INTERVAL 1 DAY), ' 00:00:00') AND CONCAT(DATE_SUB(CURRENT_DATE, INTERVAL 1 DAY), ' 23:59:59')"
  }
}

ポイント

  • ParametersとValues
    • テーブル名などの定義値をまとめた(今回の変数は例です)
  • IAM Role
    • DataPipelineDefaultResourceRoleDataPipelineDefaultRoleそれぞれData Pipeline作成時にデフォルトで作成されるロール

https://docs.aws.amazon.com/ja_jp/datapipeline/latest/DeveloperGuide/dp-get-setup.html

  • startDateTimeに指定する時刻はUTC
    • ファイル名に#{@scheduledStartTime}.csvを指定してあげることで、実行時刻の日時が挿入される
  • subnet_idを指定することで対象のサブネットにEC2が生成される
    • RDS側のセキュリティグループでサブネット内からのアクセスでかつTCPプロトコルの対象のポートが許可されていればアクセス可能

実行

無事に生成されていました。

ファイル名がUTCで時刻まで入っているのは今回は諦めました。要件として問題あれば、s3の作成イベントをトリガーにリネームなどで対処できるかも?

良かった点

出力フォーマットをSQLで表現できる

定義ファイルを見ていただければわかると思いますが、SQLを書くとそれがそのままcsvになって出力されるので、とても楽でした。

テンプレートが用意されているので構築が楽

今回の要件にバッチリ合うテンプレートが用意されていました。今回はFull Copy of RDS MySQL table to S3を利用しましたが、Incremental copyやAWS CLIの実行も便利そうですね。

定義ファイルのインポート/エクスポートが可能なので横展開が楽

json形式でエクスポートしたものをインポートできるので、定義ファイルを調整すれば別のテーブルや別のクエリにも展開できますね。

料金が安い

https://aws.amazon.com/jp/datapipeline/pricing/

低頻度実行なら無料枠があるので、Data Pipeline自体はほぼ無料で利用できました。
Data PipelineがEC2を起動するので別途インスタンス料金はかかりますが、SQLによっては実行時間はかなり短く、またインスタンスサイズはt2.microで済んでいるので、ほとんどかかりませんでした。

困った点

スケジューラーが過去の日付だと、過去分がすべて実行される

startDateTimeを仮に1ヶ月まえに設定してしまうと、本日分までの1ヶ月分の実行が完了するまでされ続けます。わかっていて利用すれば問題ではありませんが、定義ファイルを横展開する場合はstartDateTimeを調整しなければなりません。

AWS Chatbot用のイベントが提供されていない

実行通知の成功失敗通知をSlackにしようと思っていたのですが、AWS Chatbotとの統合がCodeBuildなどのようになされていません。

https://aws.amazon.com/jp/about-aws/whats-new/2020/04/receive-notifications-for-aws-codebuild-codecommit-codedeploy-codepipeline-in-slack/

なので、CloudWatch EventとSNSを利用して通知の構成を作成する必要があります。
(Data PipelineのGUIはかなり古い感じのままだし、あまりアップデートがなされる雰囲気がない・・・)

ヘッダー行の出力には工夫が必要

csvのヘッダー行を出力をするためのオプションが用意されていないので、要件として必要であれば、SQL文で無理矢理挿入するしかありませんでした。

SELECT
  'id', 'huga', 'piyo', 'created_at'
UNION ALL
SELECT
  id, huga, piyo, created_at
FROM
  hoge

こんな感じで・・・

まとめ

今回はAWS Data Pipelineを用いてcsv出力を定期実行してみました。
一度使って理解するとかなりシンプルな機構でした。料金もかなりお手頃ですし、なかなか良いサービスだなーと思いました。今度はAWS Glueをもうすこし勉強し、ちゃんと理解した上で使ってみたいです。

Discussion