Amazon Athena Federated QueryでRDS, DynamoDB, Redshiftにクエリをかける
はじめに
阿河です。
唐突ですが、バラバラなデータソースから情報を集めるのに苦労したことはありませんか?
現代のビジネス環境では、データを迅速かつ効率的に処理し、意思決定をサポートする能力が企業競争力の鍵となっています。特に複数データソースからの情報を統合し、全体像を把握することは、多くの企業にとって日常的な課題です。
Amazon Athena Federated Queryは、このような課題に対する強力な解決策を提供します。
Athena Federated Queryを使用することで、企業は異なるデータソース(例えば、RDS, DynamoDB, Redshiftなど)に対して直接SQLクエリを実行できるようになり、データの収集と分析を大幅に効率化できます。この技術により、データを異なる場所に移動させることなく、一元的に問い合わせることが可能となり、ビジネスの洞察を速やかに得ることができます。
本記事では、具体的なビジネスシナリオとして、eコマース企業のデータを例に取り上げ、Athena Federated Queryの設定方法と基本的な利用シナリオを紹介します。
目次
- 解決したい課題
- Athena Federated Queryの概要と利点
- 構築例
解決したい課題
データ駆動型の意思決定がビジネス成功の鍵となる現代において、データソースが多岐にわたるため、情報を統合し、迅速にアクセスすることが大きな課題となっています。特に、異なるデータベース技術やストレージソリューションに分散して保存されているデータに効率的にアクセスすることは、多くの企業にとっての難題です。
例えば、ある企業では
- 顧客データはDynamoDB
- 商品情報はRDSに
- 販売履歴はRedshift
に保存されています。
これらの情報を一元的に分析するためには、通常、データを一つの場所に集約する必要がありますが、このプロセスは時間がかかり、コストもかさみます。
また、データの移動はセキュリティリスクを伴うことがあり、リアルタイムでのデータアクセスを実現するには不十分です。
このような状況では、Amazon Athena Federated Queryを利用することで、各データソースに対して直接クエリを実行し、データを移動させることなく、リアルタイムでの分析と意思決定が可能になります。
Athena Federated Queryの概要
Federated Queryがないと何が困るか
- 複数データソースの検索を容易にするために、データウェアハウスを構築し、関連データのETL処理を行う複雑なパイプライン処理を構築するケースがある
- データパイプライン処理によってデータのリアルタイム性は低くなり、複数システム間のデータの正確性や一貫性を検証するカスタムプロセスが必要になる
- 連携元のアプリケーションが変更されるとデータパイプライン処理も変更する必要がある。データ修正が必要になる
工数かかりそうです。
Federated Queryがあることで何ができるか
- データをその保管場所から直接クエリすることができるため、データを移動することなく、異なるデータソースをクエリすることが可能になります
- これにより、データのリアルタイム分析や、異なるデータソース間でのデータのJOINなど、迅速な分析が可能になります。
- この機能は、Lambda関数で実行されるコネクタを使用して、Amazon RDS、Amazon DynamoDB、Amazon Redshift、さらにGoogle BigQueryやSnowflakeなど、AWS外のデータソースへのクエリを可能にします
- 対応データソースは上記リンクをご参照ください
構築例
テスト用リソースをCloudFormationで作成
ネットワークコンポーネント
AWSTemplateFormatVersion: '2010-09-09'
Resources:
VPC:
Type: 'AWS::EC2::VPC'
Properties:
CidrBlock: '10.0.0.0/16'
EnableDnsSupport: true
EnableDnsHostnames: true
PublicSubnet:
Type: 'AWS::EC2::Subnet'
Properties:
VpcId: !Ref VPC
CidrBlock: '10.0.1.0/24'
MapPublicIpOnLaunch: true
InternetGateway:
Type: 'AWS::EC2::InternetGateway'
AttachGateway:
Type: 'AWS::EC2::VPCGatewayAttachment'
Properties:
VpcId: !Ref VPC
InternetGatewayId: !Ref InternetGateway
RouteTable:
Type: 'AWS::EC2::RouteTable'
Properties:
VpcId: !Ref VPC
PublicRoute:
Type: 'AWS::EC2::Route'
Properties:
RouteTableId: !Ref RouteTable
DestinationCidrBlock: '0.0.0.0/0'
GatewayId: !Ref InternetGateway
SubnetRouteTableAssociation:
Type: 'AWS::EC2::SubnetRouteTableAssociation'
Properties:
SubnetId: !Ref PublicSubnet
RouteTableId: !Ref RouteTable
PrivateSubnet:
Type: 'AWS::EC2::Subnet'
Properties:
VpcId: !Ref VPC
CidrBlock: '10.0.2.0/24'
MapPublicIpOnLaunch: false
AvailabilityZone: !Select [0, !GetAZs '']
PrivateSubnet2:
Type: 'AWS::EC2::Subnet'
Properties:
VpcId: !Ref VPC
CidrBlock: '10.0.3.0/24'
MapPublicIpOnLaunch: false
AvailabilityZone: !Select [1, !GetAZs '']
NATGateway:
Type: 'AWS::EC2::NatGateway'
Properties:
SubnetId: !Ref PublicSubnet
AllocationId: !GetAtt ElasticIP.AllocationId
ElasticIP:
Type: 'AWS::EC2::EIP'
Properties:
Domain: vpc
PrivateRouteTable:
Type: 'AWS::EC2::RouteTable'
Properties:
VpcId: !Ref VPC
PrivateRoute:
Type: 'AWS::EC2::Route'
Properties:
RouteTableId: !Ref PrivateRouteTable
DestinationCidrBlock: '0.0.0.0/0'
NatGatewayId: !Ref NATGateway
PrivateSubnetRouteTableAssociation:
Type: 'AWS::EC2::SubnetRouteTableAssociation'
Properties:
SubnetId: !Ref PrivateSubnet
RouteTableId: !Ref PrivateRouteTable
PrivateSubnet2RouteTableAssociation:
Type: 'AWS::EC2::SubnetRouteTableAssociation'
Properties:
SubnetId: !Ref PrivateSubnet2
RouteTableId: !Ref PrivateRouteTable
RDSSecurityGroup:
Type: 'AWS::EC2::SecurityGroup'
Properties:
GroupDescription: "Security group for RDS"
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 3306
ToPort: 3306
CidrIp: 10.0.0.0/16
RedshiftSecurityGroup:
Type: 'AWS::EC2::SecurityGroup'
Properties:
GroupDescription: "Security group for Redshift"
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 5439
ToPort: 5439
CidrIp: 10.0.0.0/16
Outputs:
VPCId:
Value: !Ref VPC
Export:
Name: "VPCID"
PublicSubnetId:
Value: !Ref PublicSubnet
Export:
Name: "PublicSubnetId"
PrivateSubnetId:
Value: !Ref PrivateSubnet
Export:
Name: "PrivateSubnetId"
PrivateSubnet2Id:
Value: !Ref PrivateSubnet2
Export:
Name: "PrivateSubnet2Id"
NATGatewayId:
Value: !Ref NATGateway
Export:
Name: "NATGatewayId"
RDSSecurityGroupId:
Description: "Security Group ID for RDS"
Value: !Ref RDSSecurityGroup
Export:
Name: "RDSSecurityGroupId"
RedshiftSecurityGroupId:
Description: "Security Group ID for Redshift"
Value: !Ref RedshiftSecurityGroup
Export:
Name: "RedshiftSecurityGroupId"
DynamoDB
AWSTemplateFormatVersion: '2010-09-09'
Resources:
CustomerTable:
Type: 'AWS::DynamoDB::Table'
Properties:
TableName: "CustomerData"
AttributeDefinitions:
- AttributeName: "CustomerId"
AttributeType: "S"
KeySchema:
- AttributeName: "CustomerId"
KeyType: "HASH"
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
RDS
※テンプレートの「xxxx」は、任意のパスワードに変更してください。
※「BastionSecurityGroup」のCidrIpは、任意のIPアドレスに変更ください。
※KeyPairは事前にキーペアを作成し、テンプレート内で指定しています。
AWSTemplateFormatVersion: '2010-09-09'
Resources:
DBSubnetGroup:
Type: 'AWS::RDS::DBSubnetGroup'
Properties:
DBSubnetGroupDescription: "Subnet group for RDS"
SubnetIds:
- Fn::ImportValue: "PrivateSubnetId"
- Fn::ImportValue: "PrivateSubnet2Id"
DBInstance:
Type: 'AWS::RDS::DBInstance'
Properties:
DBInstanceClass: "db.t3.medium"
Engine: "mysql"
DBName: "ProductDB"
MasterUsername: "admin"
MasterUserPassword: "xxxxxxxxxxx"
AllocatedStorage: "20"
DBSubnetGroupName: !Ref DBSubnetGroup
VPCSecurityGroups:
- Fn::ImportValue: "RDSSecurityGroupId"
BastionHost:
Type: 'AWS::EC2::Instance'
Properties:
InstanceType: 't3.micro'
ImageId: !Sub "{{resolve:ssm:/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2}}"
SubnetId: !ImportValue "PublicSubnetId"
KeyName: "BastionKey"
SecurityGroupIds:
- !Ref BastionSecurityGroup
Tags:
- Key: Name
Value: BastionHost
BastionSecurityGroup:
Type: 'AWS::EC2::SecurityGroup'
Properties:
GroupDescription: "Allow SSH access to Bastion Host"
VpcId: !ImportValue "VPCID"
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 22
ToPort: 22
CidrIp: 0.0.0.0/0 # Change to your IP address
SecurityGroupEgress:
- IpProtocol: -1
CidrIp: 0.0.0.0/0
- RDSに接続するために、パブリックサブネットにEC2インスタンスを作成しています。
Redshift
※テンプレートの「xxxx」は、任意のパスワードに変更してください。
AWSTemplateFormatVersion: '2010-09-09'
Resources:
ClusterSubnetGroup:
Type: 'AWS::Redshift::ClusterSubnetGroup'
Properties:
Description: "Subnet group for Redshift"
SubnetIds:
- Fn::ImportValue: "PrivateSubnetId"
RedshiftCluster:
Type: 'AWS::Redshift::Cluster'
Properties:
ClusterType: "single-node"
NodeType: "dc2.large"
MasterUsername: "redshiftadmin"
MasterUserPassword: "xxxxxxxxx"
DBName: "salesdb"
ClusterSubnetGroupName: !Ref ClusterSubnetGroup
VpcSecurityGroupIds:
- Fn::ImportValue: "RedshiftSecurityGroupId"
テーブル作成 / データ投入
DynamoDB
// テーブルの確認
$ aws dynamodb list-tables
- 出力にCustomerDataテーブルが含まれていることを確認する
// データ投入
$ aws dynamodb put-item --table-name CustomerData --item '{
"CustomerId": {"S": "C001"},
"Name": {"S": "daisuke sato"},
"Email": {"S": "daisuke.sato@example.com"},
"Address": {"S": "123 ex1 Street"}
}'
$ aws dynamodb put-item --table-name CustomerData --item '{
"CustomerId": {"S": "C002"},
"Name": {"S": "taro yamamoto"},
"Email": {"S": "taro.yamamoto@example.com"},
"Address": {"S": "456 ex2 Street"}
}'
//データ投入後の確認
$ aws dynamodb scan --table-name CustomerData
- JSON形式で2つのアイテムが返ってくることを確認する
RDS
- パブリックサブネットにデプロイしたEC2インスタンスにログインして、RDSに接続します
- CloudFormationでデプロイ時には、RDSのセキュリティグループはEC2インスタンスからのインバウンド接続を許可していません。手動で設定変更ください
CREATE TABLE ProductInfo (
ProductId INT PRIMARY KEY AUTO_INCREMENT,
ProductName VARCHAR(100),
Category VARCHAR(50),
Price DECIMAL(10,2),
Stock INT
);
- テーブル作成する
INSERT INTO ProductInfo (ProductName, Category, Price, Stock) VALUES
('Laptop', 'Electronics', 1200.00, 50),
('Headphones', 'Electronics', 150.00, 200),
('Desk Chair', 'Furniture', 300.00, 20);
- サンプルデータを投入する
Redshift
- 販売履歴テーブルをSQLスクリプトで作成・投入する。
CREATE TABLE SalesHistory (
SaleId INT PRIMARY KEY,
CustomerId VARCHAR(10),
ProductId INT,
Quantity INT,
SaleDate DATE
);
- テーブル作成する
INSERT INTO SalesHistory (SaleId, CustomerId, ProductId, Quantity, SaleDate) VALUES
(1, 'C001', 1, 2, '2024-12-16'),
(2, 'C002', 2, 1, '2024-12-17'),
(3, 'C001', 3, 1, '2024-12-18');
- データ投入する
データソース設定 /検証
DynamoDB
- 各データソースコネクタは、Athenaコンソールから手軽にデプロイが可能です(下記は2024/12/17時点の手順です)
- Athenaコンソールのサイドメニューから、「Data Sources and catalogs」を選択
- 「Create data source」
- 希望するデータソースを選択(DynamoDB)
- 「Data source name」に任意の名前を入力
- 「Spill location in Amazon S3」に任意のS3バケットを指定
- 上記設定で、データソース作成
- データソース作成後にAthenaクエリエディタに移動すると、作成したデータソースを選ぶことができるようになる
- 実際にクエリをかけてみる ↓↓↓
- 前述の工程で作成したDynamoDBのテーブルデータにアクセスできていることが分かる
RDS
- DynamoDBのときと同様に「Create data source」を実行
- 希望するデータソースを選択(MySQL)
- 「Data source name」に任意の名前を入力
- Host: ※RDSのエンドポイント
- Port: 3306
- Database: ProductDB
- JDBC parameters: useSSL=false&allowPublicKeyRetrieval=true
- Secret: ※シークレットを新規作成。DBのユーザー名とパスワードを管理
- VPC/Subnet: RDSがデプロイされているVPC/サブネットID
- securityグループ: ※アウトバウンド許可
- 「Spill location in Amazon S3」に任意のS3バケットを指定
上記設定で、データソース作成する。
次にRDSのセキュリティグループのインバウンドルールで、Lambdaのセキュリティグループからのインバウンド通信を許可する。
以上データソース設定が完了したら、クエリエディタでクエリを実行します。
作成したデータソース/Databaseを指定。
Redshift
データコネクタの作成方法は、RDSと同様の考え方です。
- 「Create data source」を実行
- 希望するデータソースを選択(Redshift)
- 「Data source name」に任意の名前を入力
- Host: ※Redshiftのエンドポイント。ポート番号やDB名は含まない
- Port: 5439
- Database: salesdb
- JDBC parameters: ssl=true
- Secret: ※シークレットを新規作成。DBのユーザー名とパスワードを管理
- VPC/Subnet: RedshiftがデプロイされているVPC/サブネットID
- securityグループ: ※アウトバウンド許可
- 「Spill location in Amazon S3」に任意のS3バケットを指定
上記設定で、データソース作成する。
次にRedshiftのセキュリティグループのインバウンドルールで、Lambdaのセキュリティグループからのインバウンド通信を許可する。
こちらもクエリができることを確認できました。
複数データソースのクエリ検証
SELECT
c.customerid,
c.name,
p.productname,
s.quantity,
s.saledate
FROM
AthenaDynamoSource.default."customerdata" AS c
JOIN
AthenaRedshiftSource2.public."saleshistory" AS s
ON s.customerid = c.customerid
JOIN
AthenaRDSSource.ProductDB."ProductInfo" AS p
ON p.productid = s.productid
ORDER BY
c.customerid, s.saledate;
おわりに
本記事では、Amazon Athena Federated Queryを使用してDynamoDB、RDS、Redshiftといった異なるデータソースに対してクエリを実行する方法を解説しました。
Federated Queryは、データを一元化せずに既存のデータソースから直接クエリを実行できるため、ETLプロセスを簡素化し、リアルタイムなデータ分析を可能にします。特に、複数のデータソースを統合的に活用したいケースでは、大幅な効率化が期待できます。
今後は、さらなるデータソースの統合や高度なクエリの活用方法についても紹介していく予定です。
次回の記事もご覧いただけると嬉しいです。
Discussion