AWS SDK for JavaScriptを使ってAthenaの実行結果を取得する

commits8 min read

Athenaは標準SQLを使ってS3のファイルをクエリすることができる便利なサービスです。AWSのマネジメントコンソールから実行できますが、Athenaの実行結果をフロントエンドから扱う必要があり、実行結果をAPIのレスポンスとして返す必要がありました。

非同期なAthenaのクエリ結果を取得するにはどうするか

Athenaはクエリが非同期に実行されるので、AWS SDKから扱う場合は少し癖があります。結論としてはポーリングをしないといけません。AthenaをSDK経由で実行し、実行結果を毎回問い合わせてクエリの実行が終わったら結果を返すというかたちにする必要があります。

Node.jsで実装する

AWS SDK for JavaScriptを使うので、Node.js(TypeScript)で実装します。以下はコードの全体像です。Athenaのクエリ用のデータはAWS公式が提供しているELBのアクセスログのサンプルS3データを使用しています。

import * as AWS from "aws-sdk";
const athena = new AWS.Athena({
  region: "ap-northeast-1",
});

const sql = `
select * from elb_logs WHERE elb_name = 'elb_demo_001' ORDER BY request_timestamp ASC LIMIT 5
`;

/**
 * Athenaのクエリ結果を同期的に取得する
 */
async function getQueryResultSync(
  sql: string,
  nextToken?: string,
  limit?: number
): Promise<AWS.Athena.Types.GetQueryResultsOutput> {
  const startQueryExecutionInput: AWS.Athena.StartQueryExecutionInput = {
    QueryString: sql,
    ResultConfiguration: {
      OutputLocation: `s3://xxxxxxxxxxxxxxxx/result/`,
    },
    QueryExecutionContext: {
      Database: "sampledb",
    },
  };
  const startQuery = await athena
    .startQueryExecution(startQueryExecutionInput)
    .promise();

  let getQueryExecution;
  while (true) {
    const getQueryExecutionInput: AWS.Athena.GetQueryExecutionInput = {
      QueryExecutionId: startQuery.QueryExecutionId!,
    };
    getQueryExecution = await athena
      .getQueryExecution(getQueryExecutionInput)
      .promise();
    if (
      getQueryExecution.QueryExecution?.Status?.State === "RUNNING" ||
      getQueryExecution.QueryExecution?.Status?.State === "QUEUED"
    ) {
      console.info("Athenaのクエリステータスをポーリングしています...");
      await new Promise((resolve) => setTimeout(resolve, 1000));
    } else {
      break;
    }
  }

  const getQueryResultsInput: AWS.Athena.GetQueryResultsInput = {
    QueryExecutionId: getQueryExecution.QueryExecution?.QueryExecutionId!,
    NextToken: nextToken,
    MaxResults: limit,
  };
  const result = await athena.getQueryResults(getQueryResultsInput).promise();
  return result;
}

async function main() {
  const result = await getQueryResultSync(sql);
  const resultJson = result.ResultSet!.Rows!.map((row: any) => {
    return {
      requestTimeStamp: row!.Data![0].VarCharValue,
      elbName: row!.Data![1].VarCharValue,
      requestIp: row!.Data![2].VarCharValue,
      requestPort: row!.Data![3].VarCharValue,
      backendIp: row!.Data![4].VarCharValue,
      backendPort: row!.Data![5].VarCharValue,
      elbResponseCode: row!.Data![6].VarCharValue,
      sentBytes: row!.Data![7].VarCharValue,
    };
  });

  console.log(resultJson);
}

main().catch((e) => console.error(e));

1つ1つ見ていきます。

Athenaのクエリを実行する

まず、AWS SDKの startQueryExecution APIにSQLとデータベース名を引数にし実行します。これによりクエリの実行がスタートします。実行がスタートしただけでこの状態だとまだ実行結果は取得できません。


const startQueryExecutionInput: AWS.Athena.StartQueryExecutionInput = {
    QueryString: sql,
    ResultConfiguration: {
        OutputLocation: `s3://xxxxxxxxx/result`,
    },
    QueryExecutionContext: {
        Database: 'sampledb',
    },
};
const startQuery = await athena
    .startQueryExecution(startQueryExecutionInput)
    .promise();

Athenaのクエリ実行ステータスをポーリングする

スタートしたクエリの実行結果を取得するために以下のようにループで、getQueryExecution APIを実行します。startQueryExecution の実行結果で queryExecutionId が取得できるのでそれを引数に設定しています。実行結果のステータスは全部で5つあります。実行結果を取得し、RUNNINGQUEUED の場合は、1秒待ってからまたループさせます。それ以外のステータスの場合は実行が終わっているので、ループからぬけるというような実装になります。

  • QUEUED
  • RUNNING
  • SUCCEEDED
  • FAILED
  • CANCELLED
let getQueryExecution;
while(true) {
    const getQueryExecutionInput: AWS.Athena.GetQueryExecutionInput = {
        QueryExecutionId: startQuery.QueryExecutionId,
    };
    getQueryExecution = await athena
        .getQueryExecution(getQueryExecutionInput)
        .promise();
    if (
        getQueryExecution.QueryExecution?.Status?.State === 'RUNNING' ||
        getQueryExecution.QueryExecution?.Status?.State === 'QUEUED'
    ) {
        console.info('Athenaのクエリステータスをポーリングしています...');
        await new Promise((resolve) => setTimeout(resolve, 1000));
    } else {
        break;
    }
}

実行結果を取得する

ステータスが SUCCEEDED の場合は、getQueryResults APIで実行結果を取得できます。

const getQueryResultsInput: AWS.Athena.GetQueryResultsInput = {
        QueryExecutionId: getQueryExecution.QueryExecutionId!
    };
const result = await athena.getQueryResults(getQueryResultsInput).promise();

取得したデータを加工する

最後に、この getQueryResultSync 関数を使って取得したデータを加工します。取得したオブジェクトの ResultSetRows 配列の中にSQLのSELECTで記述した順番にデータ格納されています。

async function main() {
  const result = await getQueryResultSync(sql);
  const resultJson = result.ResultSet!.Rows!.map((row: any) => {
    return {
      requestTimeStamp: row!.Data![0].VarCharValue,
      elbName: row!.Data![1].VarCharValue,
      requestIp: row!.Data![2].VarCharValue,
      requestPort: row!.Data![3].VarCharValue,
      backendIp: row!.Data![4].VarCharValue,
      backendPort: row!.Data![5].VarCharValue,
      elbResponseCode: row!.Data![6].VarCharValue,
      sentBytes: row!.Data![7].VarCharValue,
    };
  });

  console.log(resultJson);
}

main().catch((e) => console.error(e));

実行してみる

実行すると以下のようにAthenaの実行結果がJSONとして取得できました。

npx ts-node index.ts                                                                                                [cm-sato]5/19 09:21:50 2021
Athenaのクエリステータスをポーリングしています...
Athenaのクエリステータスをポーリングしています...
[
  {
    requestTimeStamp: 'request_timestamp',
    elbName: 'elb_name',
    requestIp: 'request_ip',
    requestPort: 'request_port',
    backendIp: 'backend_ip',
    backendPort: 'backend_port',
    elbResponseCode: 'request_processing_time',
    sentBytes: 'backend_processing_time'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:02.793335Z',
    elbName: 'elb_demo_001',
    requestIp: '253.134.140.61',
    requestPort: '29168',
    backendIp: '172.33.223.180',
    backendPort: '80',
    elbResponseCode: '5.37E-4',
    sentBytes: '0.001668'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:07.858849Z',
    elbName: 'elb_demo_001',
    requestIp: '253.37.195.32',
    requestPort: '16245',
    backendIp: '172.41.178.157',
    backendPort: '8888',
    elbResponseCode: '0.001719',
    sentBytes: '3.82E-4'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:09.009322Z',
    elbName: 'elb_demo_001',
    requestIp: '253.178.154.54',
    requestPort: '10846',
    backendIp: '172.53.30.177',
    backendPort: '80',
    elbResponseCode: '1.09E-4',
    sentBytes: '0.001712'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:10.140144Z',
    elbName: 'elb_demo_001',
    requestIp: '248.166.79.147',
    requestPort: '13756',
    backendIp: '172.44.132.67',
    backendPort: '8888',
    elbResponseCode: '2.75E-4',
    sentBytes: '7.96E-4'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:10.670872Z',
    elbName: 'elb_demo_001',
    requestIp: '244.73.76.74',
    requestPort: '25207',
    backendIp: '172.51.126.72',
    backendPort: '80',
    elbResponseCode: '0.001182',
    sentBytes: '5.09E-4'
  }
]

まとめ

Athenaの実行結果をAWS SDK for JavaScriptから取得する方法について書きました。Athenaの実行結果をフロントエンドから扱いたい場合などに使える方法かと思います。

GitHubで編集を提案

Discussion

ログインするとコメントできます