🔍

Athena の名前付きクエリをAPIから実行したい(実装編)

2024/06/08に公開

概要

Athena の名前付きクエリをAPIから実行する方法を調査しました。

「名前付きクエリをAPIから実行する」プログラムを作成し、動作を確認してみました。

クエリ実行の流れ

クエリ実行からクエリ結果取得まで、以下のような流れで進みます。

  1. 名前付きクエリを用意
  2. 名前付きクエリ実行
  3. 処理完了まで待機
    • 時折進捗を確認
  4. クエリ結果を要求

1. 名前付きクエリを用意

Athena のマネジメントコンソール画面から、「Save As」を選択することで、名前付きクエリを作成・保存できます。

名前を付けて保存

保存したクエリ情報は、「保存したクエリ」タブから確認できます。

保存したクエリを確認する

クエリエディタでクエリを調整し、完成したら名前を付けて保存しましょう。

ID は、名前付きクエリを実行する際に必要となるので、控えておきます。

2. 名前付きクエリ実行

API から、名前付きクエリを実行します。

クエリIDを設定
NAMED_QUERY_ID="実行したい名前付きクエリのID"
設定した名前付きクエリを、文字列として取得
QUERY_STRING=$(aws athena get-named-query --named-query-id ${NAMED_QUERY_ID} --query 'NamedQuery.QueryString'  --output text)
クエリ実行
aws athena start-query-execution \
  --query-string "${QUERY_STRING}" \
  --result-configuraiton "OutputLocation"="クエリ結果を保存したいS3のURI" \
  --execition-parameters "名前付きクエリにて設定した '?' に代入したいパラメータ"
余談 名前付きクエリの名前から、IDを引っ張りたい

名前付きクエリの名前からIDを引っ張る方法はないかな?と思いましたが、残念ながら、効率の良い方法はなさそうです。

list-named-queries というAPIは存在しますが、こちらはIDだけで、クエリ名から検索はできなさそうでした。

$ aws athena list-named-queries
{
    "NamedQueryIds": [
        "860b831c-88d2-4451-a2f9-df54573cee5d",
        "15c735cb-23f5-4ab8-9623-31304d1c2adf",
        "0c64c24f-bb57-4781-b7db-d1ff9befea76",
        "3ece7251-8631-4d3b-b218-7707031234e4",
        "e5cf1e5b-2df8-4159-9baf-f9af105a898e",
        "4655fef4-bf45-41e2-8eb4-ba7d0d8308da",
        "b2c1f5bd-fe3d-4b2d-bdec-a4ec8f7ca937",
        "6a72263f-92ac-4c60-b736-775790f8aee4"
    ]
}

頑張ると、 ID どころか Query String を直接引っ張ることも可能です。

全件リストして検索をかける非効率プラン
$ QUERY_NAME="実行したいクエリ名"
$ QUERY_STRING=$(for id in $(aws athena list-named-queries --query "NamedQueryIds" --output text); do aws athena get-named-query --named-query-id $id; done | jq -s | jq ".[] | select(.NamedQuery.Name == \"${QUERY_NAME}\")" | jq .NamedQuery.QueryString)

if 文を交えると、 "目的のクエリ名を見つけたら Break する" みたいなこともできそうですが、ギブアップ。

3. 進捗確認 & 処理完了まで待機

get-query-execution API を使用することで、実行したクエリのステータスを確認できます。

クエリステータスの種類は QUEUED , RUNNING , SUCCEEDED , FAILED , CANCELLED の5種類です。

The state of query execution. QUEUED indicates that the query has been submitted to the service, and Athena will execute the query as soon as resources are available. RUNNING indicates that the query is in execution phase. SUCCEEDED indicates that the query completed without errors. FAILED indicates that the query experienced an error and did not complete processing. CANCELLED indicates that a user input interrupted query execution.

クエリ完了まで待機
## QUERY_STATUS が QUEUED (または RUNNING)から脱するまでループ
while true; 
do
  QUERY_STATUS=$(aws athena get-query-execution --query-execution-id ${QUERY_EXECUTION_ID} --query 'QueryExecution.Status.State' --output text)

  if [ $QUERY_STATUS = "SUCCEEDED" -o $QUERY_STATUS = "FAILED" -o $QUERY_STATUS = "CANCELLED" ]; then
    break
  fi
  echo "current status: ${QUERY_STATUS}."
  sleep 10
done

4. クエリ結果を要求

get-query-results API を使用することで、クエリ結果を取得できます。

クエリ結果取得
if [ $QUERY_STATUS = "SUCCEEDED" ]; then
  result=$(aws athena get-query-results --query-execution-id ${QUERY_EXECUTION_ID})
  echo $result
fi

クエリ結果のフォーマットは、以下公式ドキュメントをご確認ください。

クエリ結果は独自のJSONフォーマットであり、やや扱いにくいです。

get-query-results を使用する以外にも、 S3に保存されている Athena のクエリ結果(csv) をダウンロードする方法もあります。

Athena のクエリ結果 (S3 オブジェクト) のURIを取得するには、大きく以下の方法があります。

  • get-query-execution APIから取得する
  • 自力で組み立てる

get-query-execution APIから取得する

get-query-execution API から、クエリ結果を保存したオブジェクトの S3 URI を取得できます。

aws athena get-query-execution --query-execution-id ${QUERY_EXECUTION_ID} --query 'QueryExecution.ResultConfiguration.OutputLocation' --output text

自力で組み立てる

Athena のクエリ結果 (S3 オブジェクト) のURI 構造は、以下の通りです。

"OutputLocationで設定した S3 URI"/"${query-execution-id}".csv

(例)
以下を前提とします。

  • OutputLocation で指定したS3 URI ... s3://sample-bucket/athena-result/
  • クエリにて得られた QUERY_EXECUTION_ID ... 48660106-9ee8-4a89-bc5f-9644ee077140

このとき、Athena のクエリ結果 (S3 オブジェクト) のURIは、以下の通りです。

  • s3://sample-bucket/athena-result/48660106-9ee8-4a89-bc5f-9644ee077140.csv

OutputLocation および QUERY_EXECUTION_ID は、どちらもクエリを実行する過程で使用するため、簡単に組み立てることが可能です。

考慮事項(制約)

公式ドキュメントに記載。

特に気になった制約

  • 現在、? パラメータは WHERE 句にのみ配置できます。SELECT ? FROM table のような構文はサポートされていません。
  • SQL 実行パラメータを文字列として扱うには、二重引用符ではなく一重引用符で囲む必要があります。
  • Athena コンソールの実行パラメータを持つクエリでは、疑問符の数が最大 25 個に制限されます。

所感

QueryResult のパースが大変。

パースしてくれる便利なライブラリが存在しないのであれば、S3に保存したクエリ結果(CSV)を扱った方がよさそう。

特に python なら、 DataFrame を使用することで CSV と JSON (dictオブジェクト)を容易に変換できるので、使い勝手はよさそう。

参考文献

付録

Bash (aws cli) 版 サンプルコード
#!/bin/bash

set -euC

NAMED_QUERY_ID="実行したい名前付きクエリのID"
ATHENA_RESULT_S3_URI="athena クエリ結果を保存する S3 の URI" ## ex. "s3://sample-bucket/athena/result/"
EXECUTION_PARAMETERS='"後埋めのパラメータ1","後埋めのパラメータ1"' ## ex. '"hoge","fuga","1"'

QUERY_STRING=$(aws athena get-named-query \
  --named-query-id ${NAMED_QUERY_ID} \
  --query 'NamedQuery.QueryString'  \
  --output text)

QUERY_EXECUTION_ID=$(aws athena start-query-execution \
  --query-string "${QUERY_STRING}" \
  --result-configuration "OutputLocation"="${ATHENA_RESULT_S3_URI}" \
  --execution-parameters $(echo $EXECUTION_PARAMETERS | tr ',' ' ') \
  --query 'QueryExecutionId' --output text)

echo $QUERY_EXECUTION_ID

## クエリが実行中(≒QUERY_STATUS が QUEUED (または RUNNING))の間はループ
## QUERY_STATUS : https://docs.aws.amazon.com/cli/latest/reference/athena/get-query-execution.html
while true; 
do
  QUERY_STATUS=$(aws athena get-query-execution --query-execution-id ${QUERY_EXECUTION_ID} --query 'QueryExecution.Status.State' --output text)

  if [ $QUERY_STATUS = "SUCCEEDED" -o $QUERY_STATUS = "FAILED" -o $QUERY_STATUS = "CANCELLED" ]; then
    break
  fi
  echo "current status: ${QUERY_STATUS}."
  sleep 10
done

echo $QUERY_STATUS

## クエリに成功していたら、クエリ結果を出力
if [ $QUERY_STATUS = "SUCCEEDED" ]; then
  ## result のフォーマットは、以下公式ドキュメントを参照
  ## https://docs.aws.amazon.com/cli/latest/reference/athena/get-query-results.html#examples
  result=$(aws athena get-query-results --query-execution-id ${QUERY_EXECUTION_ID})
  echo $result
fi
AWS SDK V3 (javascript) 版 サンプルコード
const {
  AthenaClient,
  GetNamedQueryCommand,
  StartQueryExecutionCommand,
  GetQueryResultsCommand,
  GetQueryExecutionCommand,
} = require("@aws-sdk/client-athena"); 

const ATHENA_QUERY_OUTPUT_S3_LOCATION = `athena クエリ結果を保存する S3 の URI`; // ex. "s3://sample-bucket/athena/result/"


// https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/GetNamedQueryCommand/
const getQueryStringById = async (NamedQueryId) => {
  const client = new AthenaClient({});
  const input = {
    NamedQueryId,
  };
  const command = new GetNamedQueryCommand(input);
  const response = await client.send(command);
  return response.NamedQuery?.QueryString;
};

// https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/StartQueryExecutionCommand/
const startQueryExecution = async (QueryString, ExecutionParameters) => {
  const client = new AthenaClient({});
  const input = {
    QueryString, 
    ResultConfiguration: {
      OutputLocation: ATHENA_QUERY_OUTPUT_S3_LOCATION,
    },
    ExecutionParameters,
  };
  const command = new StartQueryExecutionCommand(input);
  const response = await client.send(command);
  return response.QueryExecutionId;
};

// https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/athena/command/GetQueryResultsCommand/
const getQueryResults = async (queryExecutionId) => {
  const client = new AthenaClient({});

  const waitQueryFinished = async (queryExecutionId) => {
    const sleep = (msec) => new Promise((resolve) => setTimeout(resolve, msec)); 
    const queryDone = (response) => {
      const queryStatus = response.QueryExecution.Status.State;
      return queryStatus === "SUCCEEDED" || queryStatus === "FAILED" | queryStatus === "CANCELLED";
    }

    const input = {
      QueryExecutionId: queryExecutionId,
    }
    const command = new GetQueryExecutionCommand(input);
    while(true){
      const response = await client.send(command);
      if(queryDone(response)) {
        return response.QueryExecution.Status.State;
      }
      await sleep(10000);//10秒待機
    }
  }

  // クエリ完了まで待機
  const queryStatus = await waitQueryFinished(queryExecutionId);
  
  // queryStatus が SUCCEEDED だったら、クエリ結果を取得して返却
  if (queryStatus === "SUCCEEDED") {
    const input = {
      QueryExecutionId: queryExecutionId,
    };
    const command = new GetQueryResultsCommand(input);
    const response = await client.send(command);
    return response;
  } else {
    throw new Error (`Query Failed. status:${queryStatus}`);
  }
  
};

const run = async () => {

  // パラメータの準備
  const namedQueryId = "実行したい、名前付きQueryのID情報";
  const executionParameters = [""]; // 名前付き Query に後埋めのパラメータがあれば、それを設定
  
  // Athena へクエリ
  const QueryString = await getQueryStringById(namedQueryId);
  const queryExecutionId = await startQueryExecution(QueryString, executionParameters);
  const queryResult = await getQueryResults(queryExecutionId);

  // 以下、頑張って query result をパース
  
  return queryResult


};

run().then(console.log);


Discussion