👁️🗨️
Go で Athena (sdk v2) 触ってみた
Go で Athena (sdk v2) 触ってみた
気になったのでやってみた系です。
Golang で Amazon Athena を触りたかったのですが、調べてみるとほとんどが SDK v1 で実装されてました。v2 で実装したかったので備忘録兼ねて書いていきます。
ちなみに grep.app は非常におすすめです。
Github 上の OSS などから grep してきてくれます。
こう言った「ドキュメントは読んだけど実際の使われ方もみたい」時に重宝します。実際自分もathena.client
で調べて、ChatGPT と会話しながら、ドキュメント見ながら、書いていきました。
package install
go get "github.com/aws/aws-sdk-go-v2/service/athena"
クライアントの作成
type AthenaClient struct {
*athena.Client
database string
table string
outputLoc string
}
func NewAthenaClient() *AthenaClient {
outputLoc := "s3://" + <output-bucket-name> + "/"
return &AthenaClient{
Client: athena.NewFromConfig(<aws-config>),
database: <database-name>,
table: <table-name>,
outputLoc: outputLoc,
}
}
Output Location, Database, Table の概念については公式ドキュメントを参照してください。
また、アプリケーション側では Output Location, Database, Table の作成に関しては責任を持ちません。Terraform やマネコンから作成済みであることを想定しています。
実装
マネコンからの操作だとなかなか気づきにくいですが、Athena へクエリを投げるのは以下の 3 つのセクションからなっています。
- クエリを投げる(Enqueue)
- クエリの結果を待つ(Polling, Wait)
- 結果を取得する(Result)
注意事項としては、2 の Polling のところでは Exponential-BackOff を実装すること(Athena API を叩く回数は抑えた方がいいため)、3 の結果は paginated であることです。
const (
maxRetries = 10
baseDelay = 2 * time.Second
maxDelay = 120 * time.Second
)
// 1. クエリを投げる
func (c *AthenaClient) EnqueueQuery(ctx context.Context) (*string, error) {
query := `SELECT * FROM <table-name>`
in := &athena.StartQueryExecutionInput{
QueryString: &query,
QueryExecutionContext: &types.QueryExecutionContext{
Database: &c.database,
},
ResultConfiguration: &types.ResultConfiguration{
OutputLocation: &c.outputLoc,
},
}
start, err := c.StartQueryExecution(ctx, in)
if err != nil {
return nil, fmt.Errorf("error in EnqueueQuery: %w", err)
}
return start.QueryExecutionId, nil
}
// 2. クエリの結果を待つ(Polling, Wait)
func (c *AthenaClient) WaitQuery(ctx context.Context, execID string) error {
in := &athena.GetQueryExecutionInput{
QueryExecutionId: &execID,
}
for retry := 0; retry < maxRetries; retry++ {
out, err := c.GetQueryExecution(ctx, in)
if err != nil {
return fmt.Errorf("error in WaitQuery: %w", err)
}
switch out.QueryExecution.Status.State {
case types.QueryExecutionStateFailed:
return errors.New("query execution failed")
case types.QueryExecutionStateCancelled:
return errors.New("query execution was cancelled")
case types.QueryExecutionStateSucceeded:
return nil
case types.QueryExecutionStateRunning, types.QueryExecutionStateQueued:
default:
return fmt.Errorf("unexpected query state %s for query %s", out.QueryExecution.Status.State, execID)
}
delay := utils.ComputeDelay(retry, maxDelay, baseDelay)
time.Sleep(delay)
}
return errors.New("execution count reached max retry")
}
// 3. 結果を取得する(Result)
func (c *AthenaClient) GetResult(ctx context.Context, execID string) ([]<domain-model>, error) {
var allRows []types.Row
var nextToken *string
for {
in := &athena.GetQueryResultsInput{
QueryExecutionId: &execID,
NextToken: nextToken,
}
out, err := c.GetQueryResults(ctx, in)
if err != nil {
return nil, err
}
if len(allRows) == 0 {
allRows = out.ResultSet.Rows
} else {
allRows = append(allRows, out.ResultSet.Rows...)
}
if out.NextToken == nil {
break
}
nextToken = out.NextToken
}
dos, err := toDomain(allRows)
if err != nil {
return nil, fmt.Errorf("error in GetResult: %w", err)
}
return dos, nil
}
// utils.go
func ComputeDelay(retry int, maxDelay, baseDelay time.Duration) time.Duration {
delay := time.Duration(math.Pow(2, float64(retry))) * baseDelay
if delay > maxDelay {
return maxDelay
}
return delay
}
あとは上位層からこれを呼び出してあげましょう。
最後に
以上です。読んでいただきありがとうございました!
Discussion