AWS SDK for JavaScriptを使ってAthenaの実行結果を取得する

2021/05/19に公開

Athena は標準 SQL を使って S3 のファイルをクエリできる便利なサービスです。AWS のマネジメントコンソールから実行できますが、Athena の実行結果をフロントエンドから扱う必要があり、実行結果を API のレスポンスとして返す必要がありました。

非同期なAthenaのクエリ結果を取得するにはどうするか

Athena はクエリが非同期に実行されるので、AWS SDK から扱う場合は少し癖があります。結論としてはポーリングをしないといけません。Athena を SDK 経由で実行し、実行結果を毎回問い合わせてクエリの実行が終わったら結果を返すというかたちにする必要があります。

Node.jsで実装する

AWS SDK for JavaScript を使うので、Node.js(TypeScript)で実装します。以下はコードの全体像です。Athena のクエリ用のデータは AWS 公式が提供している ELB のアクセスログのサンプル S3 データを使用しています。

import * as AWS from "aws-sdk";
const athena = new AWS.Athena({
  region: "ap-northeast-1",
});

const sql = `
select * from elb_logs WHERE elb_name = 'elb_demo_001' ORDER BY request_timestamp ASC LIMIT 5
`;

/**
 * Athenaのクエリ結果を同期的に取得する
 */
async function getQueryResultSync(
  sql: string,
  nextToken?: string,
  limit?: number
): Promise<AWS.Athena.Types.GetQueryResultsOutput> {
  const startQueryExecutionInput: AWS.Athena.StartQueryExecutionInput = {
    QueryString: sql,
    ResultConfiguration: {
      OutputLocation: `s3://xxxxxxxxxxxxxxxx/result/`,
    },
    QueryExecutionContext: {
      Database: "sampledb",
    },
  };
  const startQuery = await athena
    .startQueryExecution(startQueryExecutionInput)
    .promise();

  let getQueryExecution;
  while (true) {
    const getQueryExecutionInput: AWS.Athena.GetQueryExecutionInput = {
      QueryExecutionId: startQuery.QueryExecutionId!,
    };
    getQueryExecution = await athena
      .getQueryExecution(getQueryExecutionInput)
      .promise();
    if (
      getQueryExecution.QueryExecution?.Status?.State === "RUNNING" ||
      getQueryExecution.QueryExecution?.Status?.State === "QUEUED"
    ) {
      console.info("Athenaのクエリステータスをポーリングしています...");
      await new Promise((resolve) => setTimeout(resolve, 1000));
    } else {
      break;
    }
  }

  const getQueryResultsInput: AWS.Athena.GetQueryResultsInput = {
    QueryExecutionId: getQueryExecution.QueryExecution?.QueryExecutionId!,
    NextToken: nextToken,
    MaxResults: limit,
  };
  const result = await athena.getQueryResults(getQueryResultsInput).promise();
  return result;
}

async function main() {
  const result = await getQueryResultSync(sql);
  const resultJson = result.ResultSet!.Rows!.map((row: any) => {
    return {
      requestTimeStamp: row!.Data![0].VarCharValue,
      elbName: row!.Data![1].VarCharValue,
      requestIp: row!.Data![2].VarCharValue,
      requestPort: row!.Data![3].VarCharValue,
      backendIp: row!.Data![4].VarCharValue,
      backendPort: row!.Data![5].VarCharValue,
      elbResponseCode: row!.Data![6].VarCharValue,
      sentBytes: row!.Data![7].VarCharValue,
    };
  });

  console.log(resultJson);
}

main().catch((e) => console.error(e));

1 つ 1 つ見ていきます。

Athenaのクエリを実行する

まず、AWS SDK の startQueryExecution API に SQL とデータベース名を引数にし実行します。これによりクエリの実行がスタートします。実行がスタートしただけでこの状態だとまだ実行結果は取得できません。


const startQueryExecutionInput: AWS.Athena.StartQueryExecutionInput = {
    QueryString: sql,
    ResultConfiguration: {
        OutputLocation: `s3://xxxxxxxxx/result`,
    },
    QueryExecutionContext: {
        Database: 'sampledb',
    },
};
const startQuery = await athena
    .startQueryExecution(startQueryExecutionInput)
    .promise();

Athenaのクエリ実行ステータスをポーリングする

スタートしたクエリの実行結果を取得するために以下のようにループで、getQueryExecution API を実行します。startQueryExecution の実行結果で queryExecutionId が取得できるのでそれを引数に設定しています。実行結果のステータスは全部で 5 つあります。実行結果を取得し、RUNNINGQUEUED の場合は、1 秒待ってからまたループさせます。それ以外のステータスの場合は実行が終わっているので、ループからぬけるというような実装になります。

  • QUEUED
  • RUNNING
  • SUCCEEDED
  • FAILED
  • CANCELLED
let getQueryExecution;
while(true) {
    const getQueryExecutionInput: AWS.Athena.GetQueryExecutionInput = {
        QueryExecutionId: startQuery.QueryExecutionId,
    };
    getQueryExecution = await athena
        .getQueryExecution(getQueryExecutionInput)
        .promise();
    if (
        getQueryExecution.QueryExecution?.Status?.State === 'RUNNING' ||
        getQueryExecution.QueryExecution?.Status?.State === 'QUEUED'
    ) {
        console.info('Athenaのクエリステータスをポーリングしています...');
        await new Promise((resolve) => setTimeout(resolve, 1000));
    } else {
        break;
    }
}

実行結果を取得する

ステータスが SUCCEEDED の場合は、getQueryResults API で実行結果を取得できます。

const getQueryResultsInput: AWS.Athena.GetQueryResultsInput = {
        QueryExecutionId: getQueryExecution.QueryExecutionId!
    };
const result = await athena.getQueryResults(getQueryResultsInput).promise();

取得したデータを加工する

最後に、この getQueryResultSync 関数を使って取得したデータを加工します。取得したオブジェクトの ResultSetRows 配列の中に SQL の SELECT で記述した順番にデータ格納されています。

async function main() {
  const result = await getQueryResultSync(sql);
  const resultJson = result.ResultSet!.Rows!.map((row: any) => {
    return {
      requestTimeStamp: row!.Data![0].VarCharValue,
      elbName: row!.Data![1].VarCharValue,
      requestIp: row!.Data![2].VarCharValue,
      requestPort: row!.Data![3].VarCharValue,
      backendIp: row!.Data![4].VarCharValue,
      backendPort: row!.Data![5].VarCharValue,
      elbResponseCode: row!.Data![6].VarCharValue,
      sentBytes: row!.Data![7].VarCharValue,
    };
  });

  console.log(resultJson);
}

main().catch((e) => console.error(e));

実行してみる

実行すると以下のように Athena の実行結果が JSON として取得できました。

npx ts-node index.ts                                                                                                [cm-sato]5/19 09:21:50 2021
Athenaのクエリステータスをポーリングしています...
Athenaのクエリステータスをポーリングしています...
[
  {
    requestTimeStamp: 'request_timestamp',
    elbName: 'elb_name',
    requestIp: 'request_ip',
    requestPort: 'request_port',
    backendIp: 'backend_ip',
    backendPort: 'backend_port',
    elbResponseCode: 'request_processing_time',
    sentBytes: 'backend_processing_time'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:02.793335Z',
    elbName: 'elb_demo_001',
    requestIp: '253.134.140.61',
    requestPort: '29168',
    backendIp: '172.33.223.180',
    backendPort: '80',
    elbResponseCode: '5.37E-4',
    sentBytes: '0.001668'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:07.858849Z',
    elbName: 'elb_demo_001',
    requestIp: '253.37.195.32',
    requestPort: '16245',
    backendIp: '172.41.178.157',
    backendPort: '8888',
    elbResponseCode: '0.001719',
    sentBytes: '3.82E-4'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:09.009322Z',
    elbName: 'elb_demo_001',
    requestIp: '253.178.154.54',
    requestPort: '10846',
    backendIp: '172.53.30.177',
    backendPort: '80',
    elbResponseCode: '1.09E-4',
    sentBytes: '0.001712'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:10.140144Z',
    elbName: 'elb_demo_001',
    requestIp: '248.166.79.147',
    requestPort: '13756',
    backendIp: '172.44.132.67',
    backendPort: '8888',
    elbResponseCode: '2.75E-4',
    sentBytes: '7.96E-4'
  },
  {
    requestTimeStamp: '2015-01-01T00:00:10.670872Z',
    elbName: 'elb_demo_001',
    requestIp: '244.73.76.74',
    requestPort: '25207',
    backendIp: '172.51.126.72',
    backendPort: '80',
    elbResponseCode: '0.001182',
    sentBytes: '5.09E-4'
  }
]

まとめ

Athena の実行結果を AWS SDK for JavaScript から取得する方法について書きました。Athena の実行結果をフロントエンドから扱いたい場合などに使える方法かと思います。

GitHubで編集を提案

Discussion