Athena の名前付きクエリをAPIから実行したい(実装編)
概要
Athena の名前付きクエリをAPIから実行する方法を調査しました。
「名前付きクエリをAPIから実行する」プログラムを作成し、動作を確認してみました。
クエリ実行の流れ
クエリ実行からクエリ結果取得まで、以下のような流れで進みます。
- 名前付きクエリを用意
- 名前付きクエリ実行
- 処理完了まで待機
- 時折進捗を確認
- クエリ結果を要求
1. 名前付きクエリを用意
Athena のマネジメントコンソール画面から、「Save As」を選択することで、名前付きクエリを作成・保存できます。
保存したクエリ情報は、「保存したクエリ」タブから確認できます。
クエリエディタでクエリを調整し、完成したら名前を付けて保存しましょう。
ID は、名前付きクエリを実行する際に必要となるので、控えておきます。
2. 名前付きクエリ実行
API から、名前付きクエリを実行します。
NAMED_QUERY_ID="実行したい名前付きクエリのID"
QUERY_STRING=$(aws athena get-named-query --named-query-id ${NAMED_QUERY_ID} --query 'NamedQuery.QueryString' --output text)
aws athena start-query-execution \
--query-string "${QUERY_STRING}" \
--result-configuraiton "OutputLocation"="クエリ結果を保存したいS3のURI" \
--execition-parameters "名前付きクエリにて設定した '?' に代入したいパラメータ"
余談 名前付きクエリの名前から、IDを引っ張りたい
名前付きクエリの名前からIDを引っ張る方法はないかな?と思いましたが、残念ながら、効率の良い方法はなさそうです。
list-named-queries
というAPIは存在しますが、こちらはIDだけで、クエリ名から検索はできなさそうでした。
$ aws athena list-named-queries
{
"NamedQueryIds": [
"860b831c-88d2-4451-a2f9-df54573cee5d",
"15c735cb-23f5-4ab8-9623-31304d1c2adf",
"0c64c24f-bb57-4781-b7db-d1ff9befea76",
"3ece7251-8631-4d3b-b218-7707031234e4",
"e5cf1e5b-2df8-4159-9baf-f9af105a898e",
"4655fef4-bf45-41e2-8eb4-ba7d0d8308da",
"b2c1f5bd-fe3d-4b2d-bdec-a4ec8f7ca937",
"6a72263f-92ac-4c60-b736-775790f8aee4"
]
}
頑張ると、 ID どころか Query String を直接引っ張ることも可能です。
$ QUERY_NAME="実行したいクエリ名"
$ QUERY_STRING=$(for id in $(aws athena list-named-queries --query "NamedQueryIds" --output text); do aws athena get-named-query --named-query-id $id; done | jq -s | jq ".[] | select(.NamedQuery.Name == \"${QUERY_NAME}\")" | jq .NamedQuery.QueryString)
if 文を交えると、 "目的のクエリ名を見つけたら Break する" みたいなこともできそうですが、ギブアップ。
3. 進捗確認 & 処理完了まで待機
get-query-execution
API を使用することで、実行したクエリのステータスを確認できます。
クエリステータスの種類は QUEUED
, RUNNING
, SUCCEEDED
, FAILED
, CANCELLED
の5種類です。
The state of query execution. QUEUED indicates that the query has been submitted to the service, and Athena will execute the query as soon as resources are available. RUNNING indicates that the query is in execution phase. SUCCEEDED indicates that the query completed without errors. FAILED indicates that the query experienced an error and did not complete processing. CANCELLED indicates that a user input interrupted query execution.
## QUERY_STATUS が QUEUED (または RUNNING)から脱するまでループ
while true;
do
QUERY_STATUS=$(aws athena get-query-execution --query-execution-id ${QUERY_EXECUTION_ID} --query 'QueryExecution.Status.State' --output text)
if [ $QUERY_STATUS = "SUCCEEDED" -o $QUERY_STATUS = "FAILED" -o $QUERY_STATUS = "CANCELLED" ]; then
break
fi
echo "current status: ${QUERY_STATUS}."
sleep 10
done
4. クエリ結果を要求
get-query-results
API を使用することで、クエリ結果を取得できます。
if [ $QUERY_STATUS = "SUCCEEDED" ]; then
result=$(aws athena get-query-results --query-execution-id ${QUERY_EXECUTION_ID})
echo $result
fi
クエリ結果のフォーマットは、以下公式ドキュメントをご確認ください。
クエリ結果は独自のJSONフォーマットであり、やや扱いにくいです。
get-query-results
を使用する以外にも、 S3に保存されている Athena のクエリ結果(csv) をダウンロードする方法もあります。
Athena のクエリ結果 (S3 オブジェクト) のURIを取得するには、大きく以下の方法があります。
-
get-query-execution
APIから取得する - 自力で組み立てる
get-query-execution
APIから取得する
get-query-execution
API から、クエリ結果を保存したオブジェクトの S3 URI を取得できます。
aws athena get-query-execution --query-execution-id ${QUERY_EXECUTION_ID} --query 'QueryExecution.ResultConfiguration.OutputLocation' --output text
自力で組み立てる
Athena のクエリ結果 (S3 オブジェクト) のURI 構造は、以下の通りです。
"OutputLocationで設定した S3 URI"/"${query-execution-id}".csv
(例)
以下を前提とします。
- OutputLocation で指定したS3 URI ...
s3://sample-bucket/athena-result/
- クエリにて得られた QUERY_EXECUTION_ID ...
48660106-9ee8-4a89-bc5f-9644ee077140
このとき、Athena のクエリ結果 (S3 オブジェクト) のURIは、以下の通りです。
s3://sample-bucket/athena-result/48660106-9ee8-4a89-bc5f-9644ee077140.csv
OutputLocation および QUERY_EXECUTION_ID は、どちらもクエリを実行する過程で使用するため、簡単に組み立てることが可能です。
考慮事項(制約)
公式ドキュメントに記載。
特に気になった制約
- 現在、? パラメータは WHERE 句にのみ配置できます。SELECT ? FROM table のような構文はサポートされていません。
- SQL 実行パラメータを文字列として扱うには、二重引用符ではなく一重引用符で囲む必要があります。
- Athena コンソールの実行パラメータを持つクエリでは、疑問符の数が最大 25 個に制限されます。
所感
QueryResult のパースが大変。
パースしてくれる便利なライブラリが存在しないのであれば、S3に保存したクエリ結果(CSV)を扱った方がよさそう。
特に python なら、 DataFrame を使用することで CSV と JSON (dictオブジェクト)を容易に変換できるので、使い勝手はよさそう。
参考文献
- AWS CLI 公式ドキュメント
- https://docs.aws.amazon.com/cli/latest/reference/athena/get-named-query.html
- https://docs.aws.amazon.com/cli/latest/reference/athena/start-query-execution.html
- https://docs.aws.amazon.com/cli/latest/reference/athena/get-query-execution.html
- https://docs.aws.amazon.com/cli/latest/reference/athena/get-query-results.html
- AWS SDK V3 公式ドキュメント
- https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/GetNamedQueryCommand/
- https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/StartQueryExecutionCommand/
- https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/GetQueryExecutionCommand/
- https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/GetQueryResultsCommand/
- コードサンプル集
- 他
付録
Bash (aws cli) 版 サンプルコード
#!/bin/bash
set -euC
NAMED_QUERY_ID="実行したい名前付きクエリのID"
ATHENA_RESULT_S3_URI="athena クエリ結果を保存する S3 の URI" ## ex. "s3://sample-bucket/athena/result/"
EXECUTION_PARAMETERS='"後埋めのパラメータ1","後埋めのパラメータ1"' ## ex. '"hoge","fuga","1"'
QUERY_STRING=$(aws athena get-named-query \
--named-query-id ${NAMED_QUERY_ID} \
--query 'NamedQuery.QueryString' \
--output text)
QUERY_EXECUTION_ID=$(aws athena start-query-execution \
--query-string "${QUERY_STRING}" \
--result-configuration "OutputLocation"="${ATHENA_RESULT_S3_URI}" \
--execution-parameters $(echo $EXECUTION_PARAMETERS | tr ',' ' ') \
--query 'QueryExecutionId' --output text)
echo $QUERY_EXECUTION_ID
## クエリが実行中(≒QUERY_STATUS が QUEUED (または RUNNING))の間はループ
## QUERY_STATUS : https://docs.aws.amazon.com/cli/latest/reference/athena/get-query-execution.html
while true;
do
QUERY_STATUS=$(aws athena get-query-execution --query-execution-id ${QUERY_EXECUTION_ID} --query 'QueryExecution.Status.State' --output text)
if [ $QUERY_STATUS = "SUCCEEDED" -o $QUERY_STATUS = "FAILED" -o $QUERY_STATUS = "CANCELLED" ]; then
break
fi
echo "current status: ${QUERY_STATUS}."
sleep 10
done
echo $QUERY_STATUS
## クエリに成功していたら、クエリ結果を出力
if [ $QUERY_STATUS = "SUCCEEDED" ]; then
## result のフォーマットは、以下公式ドキュメントを参照
## https://docs.aws.amazon.com/cli/latest/reference/athena/get-query-results.html#examples
result=$(aws athena get-query-results --query-execution-id ${QUERY_EXECUTION_ID})
echo $result
fi
AWS SDK V3 (javascript) 版 サンプルコード
const {
AthenaClient,
GetNamedQueryCommand,
StartQueryExecutionCommand,
GetQueryResultsCommand,
GetQueryExecutionCommand,
} = require("@aws-sdk/client-athena");
const ATHENA_QUERY_OUTPUT_S3_LOCATION = `athena クエリ結果を保存する S3 の URI`; // ex. "s3://sample-bucket/athena/result/"
// https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/GetNamedQueryCommand/
const getQueryStringById = async (NamedQueryId) => {
const client = new AthenaClient({});
const input = {
NamedQueryId,
};
const command = new GetNamedQueryCommand(input);
const response = await client.send(command);
return response.NamedQuery?.QueryString;
};
// https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/StartQueryExecutionCommand/
const startQueryExecution = async (QueryString, ExecutionParameters) => {
const client = new AthenaClient({});
const input = {
QueryString,
ResultConfiguration: {
OutputLocation: ATHENA_QUERY_OUTPUT_S3_LOCATION,
},
ExecutionParameters,
};
const command = new StartQueryExecutionCommand(input);
const response = await client.send(command);
return response.QueryExecutionId;
};
// https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/GetQueryResultsCommand/
const getQueryResults = async (queryExecutionId) => {
const client = new AthenaClient({});
const waitQueryFinished = async (queryExecutionId) => {
const sleep = (msec) => new Promise((resolve) => setTimeout(resolve, msec));
const queryDone = (response) => {
const queryStatus = response.QueryExecution.Status.State;
return queryStatus === "SUCCEEDED" || queryStatus === "FAILED" | queryStatus === "CANCELLED";
}
const input = {
QueryExecutionId: queryExecutionId,
}
const command = new GetQueryExecutionCommand(input);
while(true){
const response = await client.send(command);
if(queryDone(response)) {
return response.QueryExecution.Status.State;
}
await sleep(10000);//10秒待機
}
}
// クエリ完了まで待機
const queryStatus = await waitQueryFinished(queryExecutionId);
// queryStatus が SUCCEEDED だったら、クエリ結果を取得して返却
if (queryStatus === "SUCCEEDED") {
const input = {
QueryExecutionId: queryExecutionId,
};
const command = new GetQueryResultsCommand(input);
const response = await client.send(command);
return response;
} else {
throw new Error (`Query Failed. status:${queryStatus}`);
}
};
const run = async () => {
// パラメータの準備
const namedQueryId = "実行したい、名前付きQueryのID情報";
const executionParameters = [""]; // 名前付き Query に後埋めのパラメータがあれば、それを設定
// Athena へクエリ
const QueryString = await getQueryStringById(namedQueryId);
const queryExecutionId = await startQueryExecution(QueryString, executionParameters);
const queryResult = await getQueryResults(queryExecutionId);
// 以下、頑張って query result をパース
return queryResult
};
run().then(console.log);
Discussion