📘

Amazon Athena Federated QueryでRDS, DynamoDB, Redshiftにクエリをかける

2024/12/17に公開

はじめに

阿河です。

唐突ですが、バラバラなデータソースから情報を集めるのに苦労したことはありませんか?

現代のビジネス環境では、データを迅速かつ効率的に処理し、意思決定をサポートする能力が企業競争力の鍵となっています。特に複数データソースからの情報を統合し、全体像を把握することは、多くの企業にとって日常的な課題です。

Amazon Athena Federated Queryは、このような課題に対する強力な解決策を提供します。

Athena Federated Queryを使用することで、企業は異なるデータソース(例えば、RDS, DynamoDB, Redshiftなど)に対して直接SQLクエリを実行できるようになり、データの収集と分析を大幅に効率化できます。この技術により、データを異なる場所に移動させることなく、一元的に問い合わせることが可能となり、ビジネスの洞察を速やかに得ることができます。

本記事では、具体的なビジネスシナリオとして、eコマース企業のデータを例に取り上げ、Athena Federated Queryの設定方法と基本的な利用シナリオを紹介します。

目次

  1. 解決したい課題
  2. Athena Federated Queryの概要と利点
  3. 構築例

解決したい課題

データ駆動型の意思決定がビジネス成功の鍵となる現代において、データソースが多岐にわたるため、情報を統合し、迅速にアクセスすることが大きな課題となっています。特に、異なるデータベース技術やストレージソリューションに分散して保存されているデータに効率的にアクセスすることは、多くの企業にとっての難題です。

例えば、ある企業では

  • 顧客データはDynamoDB
  • 商品情報はRDSに
  • 販売履歴はRedshift

に保存されています。

これらの情報を一元的に分析するためには、通常、データを一つの場所に集約する必要がありますが、このプロセスは時間がかかり、コストもかさみます。
また、データの移動はセキュリティリスクを伴うことがあり、リアルタイムでのデータアクセスを実現するには不十分です。

このような状況では、Amazon Athena Federated Queryを利用することで、各データソースに対して直接クエリを実行し、データを移動させることなく、リアルタイムでの分析と意思決定が可能になります。

Athena Federated Queryの概要

https://docs.aws.amazon.com/ja_jp/athena/latest/ug/connect-to-a-data-source.html
https://aws.amazon.com/jp/blogs/news/query-any-data-source-with-amazon-athenas-new-federated-query/

Federated Queryがないと何が困るか

  • 複数データソースの検索を容易にするために、データウェアハウスを構築し、関連データのETL処理を行う複雑なパイプライン処理を構築するケースがある
  • データパイプライン処理によってデータのリアルタイム性は低くなり、複数システム間のデータの正確性や一貫性を検証するカスタムプロセスが必要になる
  • 連携元のアプリケーションが変更されるとデータパイプライン処理も変更する必要がある。データ修正が必要になる

工数かかりそうです。

Federated Queryがあることで何ができるか

  • データをその保管場所から直接クエリすることができるため、データを移動することなく、異なるデータソースをクエリすることが可能になります
  • これにより、データのリアルタイム分析や、異なるデータソース間でのデータのJOINなど、迅速な分析が可能になります。

https://docs.aws.amazon.com/ja_jp/athena/latest/ug/connectors-available.html

  • この機能は、Lambda関数で実行されるコネクタを使用して、Amazon RDS、Amazon DynamoDB、Amazon Redshift、さらにGoogle BigQueryやSnowflakeなど、AWS外のデータソースへのクエリを可能にします
  • 対応データソースは上記リンクをご参照ください

構築例

テスト用リソースをCloudFormationで作成

ネットワークコンポーネント

AWSTemplateFormatVersion: '2010-09-09'

Resources:
  VPC:
    Type: 'AWS::EC2::VPC'
    Properties:
      CidrBlock: '10.0.0.0/16'
      EnableDnsSupport: true
      EnableDnsHostnames: true

  PublicSubnet:
    Type: 'AWS::EC2::Subnet'
    Properties:
      VpcId: !Ref VPC
      CidrBlock: '10.0.1.0/24'
      MapPublicIpOnLaunch: true

  InternetGateway:
    Type: 'AWS::EC2::InternetGateway'

  AttachGateway:
    Type: 'AWS::EC2::VPCGatewayAttachment'
    Properties:
      VpcId: !Ref VPC
      InternetGatewayId: !Ref InternetGateway

  RouteTable:
    Type: 'AWS::EC2::RouteTable'
    Properties:
      VpcId: !Ref VPC

  PublicRoute:
    Type: 'AWS::EC2::Route'
    Properties:
      RouteTableId: !Ref RouteTable
      DestinationCidrBlock: '0.0.0.0/0'
      GatewayId: !Ref InternetGateway

  SubnetRouteTableAssociation:
    Type: 'AWS::EC2::SubnetRouteTableAssociation'
    Properties:
      SubnetId: !Ref PublicSubnet
      RouteTableId: !Ref RouteTable

  PrivateSubnet:
    Type: 'AWS::EC2::Subnet'
    Properties:
      VpcId: !Ref VPC
      CidrBlock: '10.0.2.0/24'
      MapPublicIpOnLaunch: false
      AvailabilityZone: !Select [0, !GetAZs ''] 

  PrivateSubnet2:
    Type: 'AWS::EC2::Subnet'
    Properties:
      VpcId: !Ref VPC
      CidrBlock: '10.0.3.0/24'
      MapPublicIpOnLaunch: false
      AvailabilityZone: !Select [1, !GetAZs '']

  NATGateway:
    Type: 'AWS::EC2::NatGateway'
    Properties:
      SubnetId: !Ref PublicSubnet
      AllocationId: !GetAtt ElasticIP.AllocationId

  ElasticIP:
    Type: 'AWS::EC2::EIP'
    Properties:
      Domain: vpc

  PrivateRouteTable:
    Type: 'AWS::EC2::RouteTable'
    Properties:
      VpcId: !Ref VPC

  PrivateRoute:
    Type: 'AWS::EC2::Route'
    Properties:
      RouteTableId: !Ref PrivateRouteTable
      DestinationCidrBlock: '0.0.0.0/0'
      NatGatewayId: !Ref NATGateway

  PrivateSubnetRouteTableAssociation:
    Type: 'AWS::EC2::SubnetRouteTableAssociation'
    Properties:
      SubnetId: !Ref PrivateSubnet
      RouteTableId: !Ref PrivateRouteTable

  PrivateSubnet2RouteTableAssociation:
    Type: 'AWS::EC2::SubnetRouteTableAssociation'
    Properties:
      SubnetId: !Ref PrivateSubnet2
      RouteTableId: !Ref PrivateRouteTable

  RDSSecurityGroup:
    Type: 'AWS::EC2::SecurityGroup'
    Properties:
      GroupDescription: "Security group for RDS"
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 3306
          ToPort: 3306
          CidrIp: 10.0.0.0/16  

  RedshiftSecurityGroup:
    Type: 'AWS::EC2::SecurityGroup'
    Properties:
      GroupDescription: "Security group for Redshift"
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 5439
          ToPort: 5439
          CidrIp: 10.0.0.0/16 

Outputs:
  VPCId:
    Value: !Ref VPC
    Export:
      Name: "VPCID"

  PublicSubnetId:
    Value: !Ref PublicSubnet
    Export:
      Name: "PublicSubnetId"

  PrivateSubnetId:
    Value: !Ref PrivateSubnet
    Export:
      Name: "PrivateSubnetId"

  PrivateSubnet2Id:
    Value: !Ref PrivateSubnet2
    Export:
      Name: "PrivateSubnet2Id"

  NATGatewayId:
    Value: !Ref NATGateway
    Export:
      Name: "NATGatewayId"

  RDSSecurityGroupId:
    Description: "Security Group ID for RDS"
    Value: !Ref RDSSecurityGroup
    Export:
      Name: "RDSSecurityGroupId"

  RedshiftSecurityGroupId:
    Description: "Security Group ID for Redshift"
    Value: !Ref RedshiftSecurityGroup
    Export:
      Name: "RedshiftSecurityGroupId"

DynamoDB

AWSTemplateFormatVersion: '2010-09-09'

Resources:
  CustomerTable:
    Type: 'AWS::DynamoDB::Table'
    Properties:
      TableName: "CustomerData"
      AttributeDefinitions:
        - AttributeName: "CustomerId"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "CustomerId"
          KeyType: "HASH"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

RDS

※テンプレートの「xxxx」は、任意のパスワードに変更してください。
※「BastionSecurityGroup」のCidrIpは、任意のIPアドレスに変更ください。
※KeyPairは事前にキーペアを作成し、テンプレート内で指定しています。

AWSTemplateFormatVersion: '2010-09-09'

Resources:
  DBSubnetGroup:
    Type: 'AWS::RDS::DBSubnetGroup'
    Properties:
      DBSubnetGroupDescription: "Subnet group for RDS"
      SubnetIds:
        - Fn::ImportValue: "PrivateSubnetId"
        - Fn::ImportValue: "PrivateSubnet2Id"
  DBInstance:
    Type: 'AWS::RDS::DBInstance'
    Properties:
      DBInstanceClass: "db.t3.medium"
      Engine: "mysql"
      DBName: "ProductDB"
      MasterUsername: "admin"
      MasterUserPassword: "xxxxxxxxxxx"
      AllocatedStorage: "20"
      DBSubnetGroupName: !Ref DBSubnetGroup
      VPCSecurityGroups:
        - Fn::ImportValue: "RDSSecurityGroupId"

  BastionHost:
    Type: 'AWS::EC2::Instance'
    Properties:
      InstanceType: 't3.micro'
      ImageId: !Sub "{{resolve:ssm:/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2}}"
      SubnetId: !ImportValue "PublicSubnetId"
      KeyName: "BastionKey"     
      SecurityGroupIds:
        - !Ref BastionSecurityGroup
      Tags:
        - Key: Name
          Value: BastionHost

  BastionSecurityGroup:
    Type: 'AWS::EC2::SecurityGroup'
    Properties:
      GroupDescription: "Allow SSH access to Bastion Host"
      VpcId: !ImportValue "VPCID"
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 22
          ToPort: 22
          CidrIp: 0.0.0.0/0 # Change to your IP address
      SecurityGroupEgress:
        - IpProtocol: -1
          CidrIp: 0.0.0.0/0
  • RDSに接続するために、パブリックサブネットにEC2インスタンスを作成しています。

Redshift

※テンプレートの「xxxx」は、任意のパスワードに変更してください。

AWSTemplateFormatVersion: '2010-09-09'

Resources:
  ClusterSubnetGroup:
    Type: 'AWS::Redshift::ClusterSubnetGroup'
    Properties:
      Description: "Subnet group for Redshift"
      SubnetIds:
        - Fn::ImportValue: "PrivateSubnetId"

  RedshiftCluster:
    Type: 'AWS::Redshift::Cluster'
    Properties:
      ClusterType: "single-node"
      NodeType: "dc2.large"
      MasterUsername: "redshiftadmin"
      MasterUserPassword: "xxxxxxxxx"
      DBName: "salesdb"
      ClusterSubnetGroupName: !Ref ClusterSubnetGroup
      VpcSecurityGroupIds:
        - Fn::ImportValue: "RedshiftSecurityGroupId"

テーブル作成 / データ投入

DynamoDB

// テーブルの確認

$ aws dynamodb list-tables
  • 出力にCustomerDataテーブルが含まれていることを確認する

// データ投入

$ aws dynamodb put-item --table-name CustomerData --item '{
  "CustomerId": {"S": "C001"},
  "Name": {"S": "daisuke sato"},
  "Email": {"S": "daisuke.sato@example.com"},
  "Address": {"S": "123 ex1 Street"}
}'

$ aws dynamodb put-item --table-name CustomerData --item '{
  "CustomerId": {"S": "C002"},
  "Name": {"S": "taro yamamoto"},
  "Email": {"S": "taro.yamamoto@example.com"},
  "Address": {"S": "456 ex2 Street"}
}'

//データ投入後の確認

$ aws dynamodb scan --table-name CustomerData
  • JSON形式で2つのアイテムが返ってくることを確認する

RDS

  • パブリックサブネットにデプロイしたEC2インスタンスにログインして、RDSに接続します
  • CloudFormationでデプロイ時には、RDSのセキュリティグループはEC2インスタンスからのインバウンド接続を許可していません。手動で設定変更ください
CREATE TABLE ProductInfo (
    ProductId INT PRIMARY KEY AUTO_INCREMENT,
    ProductName VARCHAR(100),
    Category VARCHAR(50),
    Price DECIMAL(10,2),
    Stock INT
);
  • テーブル作成する
INSERT INTO ProductInfo (ProductName, Category, Price, Stock) VALUES 
('Laptop', 'Electronics', 1200.00, 50),
('Headphones', 'Electronics', 150.00, 200),
('Desk Chair', 'Furniture', 300.00, 20);
  • サンプルデータを投入する

Redshift

  • 販売履歴テーブルをSQLスクリプトで作成・投入する。
CREATE TABLE SalesHistory (
    SaleId INT PRIMARY KEY,
    CustomerId VARCHAR(10),
    ProductId INT,
    Quantity INT,
    SaleDate DATE
);
  • テーブル作成する
INSERT INTO SalesHistory (SaleId, CustomerId, ProductId, Quantity, SaleDate) VALUES
(1, 'C001', 1, 2, '2024-12-16'),
(2, 'C002', 2, 1, '2024-12-17'),
(3, 'C001', 3, 1, '2024-12-18');
  • データ投入する

データソース設定 /検証

DynamoDB

https://docs.aws.amazon.com/ja_jp/athena/latest/ug/connect-to-a-data-source-lambda-deploying.html

  • 各データソースコネクタは、Athenaコンソールから手軽にデプロイが可能です(下記は2024/12/17時点の手順です)
    • Athenaコンソールのサイドメニューから、「Data Sources and catalogs」を選択
    • 「Create data source」
    • 希望するデータソースを選択(DynamoDB)
    • 「Data source name」に任意の名前を入力
    • 「Spill location in Amazon S3」に任意のS3バケットを指定
    • 上記設定で、データソース作成

  • データソース作成後にAthenaクエリエディタに移動すると、作成したデータソースを選ぶことができるようになる
  • 実際にクエリをかけてみる ↓↓↓

  • 前述の工程で作成したDynamoDBのテーブルデータにアクセスできていることが分かる

RDS

  • DynamoDBのときと同様に「Create data source」を実行
  • 希望するデータソースを選択(MySQL)
  • 「Data source name」に任意の名前を入力
  • Host: ※RDSのエンドポイント
  • Port: 3306
  • Database: ProductDB
  • JDBC parameters: useSSL=false&allowPublicKeyRetrieval=true
  • Secret: ※シークレットを新規作成。DBのユーザー名とパスワードを管理
  • VPC/Subnet: RDSがデプロイされているVPC/サブネットID
  • securityグループ: ※アウトバウンド許可
  • 「Spill location in Amazon S3」に任意のS3バケットを指定

上記設定で、データソース作成する。

次にRDSのセキュリティグループのインバウンドルールで、Lambdaのセキュリティグループからのインバウンド通信を許可する。

以上データソース設定が完了したら、クエリエディタでクエリを実行します。
作成したデータソース/Databaseを指定。

Redshift

データコネクタの作成方法は、RDSと同様の考え方です。

  • 「Create data source」を実行
  • 希望するデータソースを選択(Redshift)
  • 「Data source name」に任意の名前を入力
  • Host: ※Redshiftのエンドポイント。ポート番号やDB名は含まない
  • Port: 5439
  • Database: salesdb
  • JDBC parameters: ssl=true
  • Secret: ※シークレットを新規作成。DBのユーザー名とパスワードを管理
  • VPC/Subnet: RedshiftがデプロイされているVPC/サブネットID
  • securityグループ: ※アウトバウンド許可
  • 「Spill location in Amazon S3」に任意のS3バケットを指定

上記設定で、データソース作成する。

次にRedshiftのセキュリティグループのインバウンドルールで、Lambdaのセキュリティグループからのインバウンド通信を許可する。

こちらもクエリができることを確認できました。

複数データソースのクエリ検証

SELECT 
    c.customerid, 
    c.name, 
    p.productname, 
    s.quantity, 
    s.saledate
FROM 
    AthenaDynamoSource.default."customerdata" AS c
JOIN 
    AthenaRedshiftSource2.public."saleshistory" AS s 
    ON s.customerid = c.customerid
JOIN 
    AthenaRDSSource.ProductDB."ProductInfo" AS p 
    ON p.productid = s.productid  
ORDER BY 
    c.customerid, s.saledate;

おわりに

本記事では、Amazon Athena Federated Queryを使用してDynamoDB、RDS、Redshiftといった異なるデータソースに対してクエリを実行する方法を解説しました。

Federated Queryは、データを一元化せずに既存のデータソースから直接クエリを実行できるため、ETLプロセスを簡素化し、リアルタイムなデータ分析を可能にします。特に、複数のデータソースを統合的に活用したいケースでは、大幅な効率化が期待できます。

今後は、さらなるデータソースの統合や高度なクエリの活用方法についても紹介していく予定です。
次回の記事もご覧いただけると嬉しいです。

MEGAZONE株式会社 Tech Blog

Discussion