👁️‍🗨️

Go で Athena (sdk v2) 触ってみた

2023/10/13に公開

Go で Athena (sdk v2) 触ってみた

気になったのでやってみた系です。

Golang で Amazon Athena を触りたかったのですが、調べてみるとほとんどが SDK v1 で実装されてました。v2 で実装したかったので備忘録兼ねて書いていきます。

ちなみに grep.app は非常におすすめです。

Github 上の OSS などから grep してきてくれます。

こう言った「ドキュメントは読んだけど実際の使われ方もみたい」時に重宝します。実際自分もathena.clientで調べて、ChatGPT と会話しながら、ドキュメント見ながら、書いていきました。

package install

go get "github.com/aws/aws-sdk-go-v2/service/athena"

クライアントの作成

type AthenaClient struct {
    *athena.Client
    database  string
    table     string
    outputLoc string
}

func NewAthenaClient() *AthenaClient {
    outputLoc := "s3://" + <output-bucket-name> + "/"

    return &AthenaClient{
        Client:    athena.NewFromConfig(<aws-config>),
        database:  <database-name>,
        table:     <table-name>,
        outputLoc: outputLoc,
    }
}

Output Location, Database, Table の概念については公式ドキュメントを参照してください。

また、アプリケーション側では Output Location, Database, Table の作成に関しては責任を持ちません。Terraform やマネコンから作成済みであることを想定しています。

実装

マネコンからの操作だとなかなか気づきにくいですが、Athena へクエリを投げるのは以下の 3 つのセクションからなっています。

  1. クエリを投げる(Enqueue)
  2. クエリの結果を待つ(Polling, Wait)
  3. 結果を取得する(Result)

注意事項としては、2 の Polling のところでは Exponential-BackOff を実装すること(Athena API を叩く回数は抑えた方がいいため)、3 の結果は paginated であることです。

const (
    maxRetries = 10
    baseDelay  = 2 * time.Second
    maxDelay   = 120 * time.Second
)

// 1. クエリを投げる
func (c *AthenaClient) EnqueueQuery(ctx context.Context) (*string, error) {
    query := `SELECT * FROM <table-name>`

    in := &athena.StartQueryExecutionInput{
        QueryString: &query,
        QueryExecutionContext: &types.QueryExecutionContext{
            Database: &c.database,
        },
        ResultConfiguration: &types.ResultConfiguration{
            OutputLocation: &c.outputLoc,
        },
    }

    start, err := c.StartQueryExecution(ctx, in)
    if err != nil {
        return nil, fmt.Errorf("error in EnqueueQuery: %w", err)
    }

    return start.QueryExecutionId, nil
}

// 2. クエリの結果を待つ(Polling, Wait)
func (c *AthenaClient) WaitQuery(ctx context.Context, execID string) error {
    in := &athena.GetQueryExecutionInput{
        QueryExecutionId: &execID,
    }

    for retry := 0; retry < maxRetries; retry++ {
        out, err := c.GetQueryExecution(ctx, in)
        if err != nil {
            return fmt.Errorf("error in WaitQuery: %w", err)
        }

        switch out.QueryExecution.Status.State {
        case types.QueryExecutionStateFailed:
            return errors.New("query execution failed")
        case types.QueryExecutionStateCancelled:
            return errors.New("query execution was cancelled")
        case types.QueryExecutionStateSucceeded:
            return nil
        case types.QueryExecutionStateRunning, types.QueryExecutionStateQueued:
        default:
            return fmt.Errorf("unexpected query state %s for query %s", out.QueryExecution.Status.State, execID)
        }

        delay := utils.ComputeDelay(retry, maxDelay, baseDelay)
        time.Sleep(delay)
    }
    return errors.New("execution count reached max retry")
}

// 3. 結果を取得する(Result)
func (c *AthenaClient) GetResult(ctx context.Context, execID string) ([]<domain-model>, error) {
    var allRows []types.Row
    var nextToken *string

    for {
        in := &athena.GetQueryResultsInput{
            QueryExecutionId: &execID,
            NextToken:        nextToken,
        }
        out, err := c.GetQueryResults(ctx, in)
        if err != nil {
            return nil, err
        }

        if len(allRows) == 0 {
            allRows = out.ResultSet.Rows
        } else {
            allRows = append(allRows, out.ResultSet.Rows...)
        }

        if out.NextToken == nil {
            break
	}
        nextToken = out.NextToken
    }

    dos, err := toDomain(allRows)
    if err != nil {
        return nil, fmt.Errorf("error in GetResult: %w", err)
    }
    return dos, nil
}

// utils.go
func ComputeDelay(retry int, maxDelay, baseDelay time.Duration) time.Duration {
    delay := time.Duration(math.Pow(2, float64(retry))) * baseDelay
    if delay > maxDelay {
        return maxDelay
    }
    return delay
}

あとは上位層からこれを呼び出してあげましょう。

最後に

以上です。読んでいただきありがとうございました!

Discussion