AWS SAMとEventBridgeを使用したLambda関数の定期実行
はじめに
AWS SAMを使用して、Lambdaを定期的に実行するためにEventBridgeを利用しました。
SAMのテンプレート(template.yaml)にEventBridgeのルールを定義するだけで、Lambdaを定期的に実行することが可能です。
今回は、LambdaをGo言語で実装しました。
後ほど具体的な実装に触れますが、シンプルに文字列を表示し、定期的に動作していることが一目でわかるようにしています。
最後に、おまけとしてLambdaの実装について紹介します。この実装では、bitflyerが公開しているAPIを使用して仮想通貨の販売価格をS3に保存する内容になっています。
準備
SAMのセットアップが完了したら、テンプレートにEventBridgeのルールを定義します。
その後、SAMをデプロイすると、テンプレートからEventBridgeが作成され、Lambdaが定期的に実行されるようになります。
AWS SAMの準備
SAMのセットアップ行います。
参考になる記事「LambdaとGolangで始めるサーバレス開発」には、セットアップ手順が記載されていますので、そちらをご参照ください。
EventBridge スケジューライベントの作成
template.yamlのEvents
プロパティを下記のように編集します。
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: hello-world/
Handler: hello-world
Runtime: go1.x
Architectures:
- x86_64
Events:
ScheduledFunction:
Type: Schedule
Properties:
Schedule: cron(0/5 * * * ? *)
Enabled: true
Name: TestSchedule
Description: test schedule
template.yamlの全体
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: |
sam-example-cron
Sample SAM Template for sam-example-cron
Globals:
Function:
Timeout: 5
MemorySize: 128
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: hello-world/
Handler: hello-world
Runtime: go1.x
Architectures:
- x86_64
Events:
ScheduledFunction:
Type: Schedule
Properties:
Schedule: cron(0/5 * * * ? *)
Enabled: true
Name: TestSchedule
Description: test schedule
Environment:
Variables:
PARAM1: VALUE
ApplicationResourceGroup:
Type: AWS::ResourceGroups::Group
Properties:
Name: !Sub ApplicationInsights-SAM-${AWS::StackName}
ResourceQuery:
Type: CLOUDFORMATION_STACK_1_0
ApplicationInsightsMonitoring:
Type: AWS::ApplicationInsights::Application
Properties:
ResourceGroupName: !Ref ApplicationResourceGroup
AutoConfigurationEnabled: 'true'
EventBridgeのスケジュールのプロパティは数多くありますが、最低限必要なものを定義しています。5分ごとにLambdaを実行するように設定しています。公式では、他のプロパティも紹介されていますので、気になる方は参照してください。
TypeでSchedule
としていますが、ScheduleV2
も利用可能です。
ScheduleはUTCに基づいて動作し、ScheduleV2はタイムゾーンに依存します。
ScheduleV2では、プロパティScheduleExpressionTimezoneを使用して、スケジュール式の評価に使用するタイムゾーンを設定できます。
Lambadaの作成
package main
import (
"fmt"
"github.com/aws/aws-lambda-go/lambda"
)
func handler() {
fmt.Println("Hello")
}
func main() {
lambda.Start(handler)
}
定期実行する
SAMをデプロイします。
sam deploy --guided
質問に対する回答はこちらの記事を参考にしてください。
デプロイが成功すると、Lambda関数の詳細画面にアクセスすると、EventBridgeがトリガーとして設定されて動作していることを確認できます。
結果の確認
ログが定期的に出力されているかを確認するために、CloudWatchのコンソール画面にアクセスします。
5分ごとにログが出力されていることを確認できました!
おまけ
これまで、ログをCloudWatchに出力していましたが、少し変更して、定期実行の結果をS3に保存するようにしてみます。また、単純に現在時刻を保存するだけでは実装が単純すぎるため、以前に作成した記事の「LambdaとGolangで始めるサーバレス開発」を使用して仮想通貨の販売価格をJSON形式でS3に保存するように実装します。
S3の準備
S3バケットを作成し、そのバケット名をAWS Systems Manager パラメータストアに設定する方法について説明しています。
S3バケットの作成
S3バケットを作成します。
aws s3 mb s3://[バケット名]
S3のバケット名をAWS Systems Manager パラメータストアに設定
実装において、バケット名の管理方法はさまざまありますが、今回はAWS Systems Managerのパラメータストアを使用して管理し、値を取得するようにします。
具体的な設定方法については説明しませんが、以下の記事が参考になりました。
AWS SAMでのアプリケーションのテンプレート設定と名称変更の手順
最初に、AWS SAMのセットアップ時にアプリケーションのテンプレートを「Hello World Example」に設定したため、フォルダやファイル名などがデフォルトの「hello-world」となっています。今回はbitflyerからデータを取得する実装を行うため、これらの名称を「bitflyer」に変更します。
※ここでは任意の名前に変更していただいても問題ありませんが、後述で登場する「bitflyer」の部分を変更した名前に置き換えてください。
- template.yamlの変更
+ BitflyerFunction:
- HelloWorldFunction:
Properties:
+ CodeUri: bitflyer/
+ Handler: bitflyer
- CodeUri: hello-world/
- Handler: hello-world
Runtime: go1.x
-
作成プロジェクト直下のhello-worldフォルダ名を変更
hello-world => bitflyer -
go.modのモジュール名を変更
+ module bitflyer
- module hello-world
実装
構成のイメージです。
|--bitflyer
| |--api
| | |--market_api.go
| |--awslib
| | |--s3.go
| | |--systemManager.go
| |--go.mod
| |--go.sum
| |--main.go
| |--utils
| | |--http.utils.go
エントリーポイント
main.goの実装
package main
import (
"bitflyer/api"
"bitflyer/awslib"
"bitflyer/utils"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/lambda"
)
func handler() {
// 各種通貨のコードを取得
markets, err := api.GetMarkets()
if err != nil {
fmt.Printf("Error getting marks: %s\n", err)
return
}
var tickers []api.Ticker
for _, market := range markets {
// 通貨ごとのTickerを取得
ticker, err := api.GetTicker(market.ProductCode)
if err != nil {
fmt.Printf("Error getting ticker: %s\n", err)
return
}
tickers = append(tickers, *ticker)
}
jsonTickers, err := json.Marshal(tickers)
if err != nil {
fmt.Printf("Error marshaling ticker data: %s\n", err)
return
}
// フォルダとオブジェクトキーの作成
now := utils.GetTimeNow()
folderPath := fmt.Sprintf("%d/%02d/%02d", now.Year(), now.Month(), now.Day()) // S3への保存パス
objectKey := folderPath + fmt.Sprintf("/%02d%02d_ticker.json", now.Hour(), now.Minute()) // S3への保存ファイル名
// バケット名の取得
bucketName, err := awslib.GetParameterStore("bitflyer-s3-bucket")
if err != nil {
fmt.Printf("Error getting parameter store: %s\n", err)
return
}
err = awslib.SaveToS3(jsonTickers, bucketName, objectKey)
if err != nil {
fmt.Printf("Error saving ticker data to S3: %s\n", err)
return
}
fmt.Println("Ticker data saved to S3 successfully.")
}
func main() {
lambda.Start(handler)
}
bitflyer提供のAPI呼び出し
market_api.goの実装
package api
import (
"bitflyer/utils"
"encoding/json"
)
type Market struct {
ProductCode string `json:"product_code"`
MarketType string `json:"market_type"`
}
type Ticker struct {
ProductCode string `json:"product_code"`
State string `json:"state"`
TimeStamp string `json:"timestamp"`
TickID int `json:"tick_id"`
BestBid float64 `json:"best_bid"`
BestAsk float64 `json:"best_ask"`
BestBidSize float64 `json:"best_bid_size"`
BestAskSize float64 `json:"best_ask_size"`
TotalBidDepth float64 `json:"total_bid_depth"`
TotalAskDepth float64 `json:"total_ask_depth"`
Ltp float64 `json:"ltp"`
Volume float64 `json:"volume"`
VolumeByProduct float64 `json:"volume_by_product"`
}
var (
// マーケット一覧の取得URL
MarketsURL = "https://api.bitflyer.com//v1/markets"
// Tickerの取得URL
TickerURL = "https://api.bitflyer.com/v1/ticker"
)
// マーケット一覧を取得します
func GetMarkets() ([]Market, error) {
res, err := utils.HttpRequest("GET", MarketsURL, map[string]string{})
if err != nil {
return nil, err
}
var markets []Market
err = json.Unmarshal(res, &markets)
if err != nil {
return nil, err
}
return markets, nil
}
// Ticker情報を取得します
func GetTicker(code string) (*Ticker, error) {
res, err := utils.HttpRequest("GET", TickerURL, map[string]string{"product_code": code})
if err != nil {
return &Ticker{}, err
}
var ticker Ticker
err = json.Unmarshal(res, &ticker)
if err != nil {
return &Ticker{}, err
}
return &ticker, nil
}
S3へのデータ保存
s3.goの実装
package awslib
import (
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
// S3へのデータ保存
func SaveToS3(data []byte, bucketName, objectKey string) error {
// AWSセッションの作成
sess, err := session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
})
if err != nil {
return err
}
// S3サービスクライアントの作成
s3Client := s3.New(sess)
// S3オブジェクトの作成とデータのアップロード
_, err = s3Client.PutObject(&s3.PutObjectInput{
Body: aws.ReadSeekCloser(bytes.NewReader(data)),
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
if err != nil {
return err
}
return nil
}
AWS System Managerからパラメータストアを取得
systemManager.goの実装
package awslib
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ssm"
)
// System Managerからパラメータストアを取得する
func GetParameterStore(key string) (string, error) {
sess := session.Must(session.NewSession())
svc := ssm.New(
sess,
aws.NewConfig().WithRegion("ap-northeast-1"),
)
res, err := svc.GetParameter(&ssm.GetParameterInput{
Name: aws.String(key),
WithDecryption: aws.Bool(true),
})
if err != nil {
return "", err
}
return *res.Parameter.Value, nil
}
httpリクエスト処理
http.utils.go
package utils
import (
"io/ioutil"
"net/http"
"time"
)
func HttpRequest(method, url string, query map[string]string) ([]byte, error) {
req, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
for key, val := range query {
q.Add(key, val)
}
req.URL.RawQuery = q.Encode()
client := new(http.Client)
res, err := client.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
return body, nil
}
func GetTimeNow() time.Time {
// タイムゾーンを日本時間に設定
jst := time.FixedZone("Asia/Tokyo", 9*60*60)
// 日本の現在時間を取得
now := time.Now().In(jst)
return now
}
実行結果
実装が完了し、再度「sam build」と「sam deploy」コマンドを実行すると、5分ごとにJSONファイルがS3バケットに出力されていることを確認できます。
JSONファイルの中身が正しく出力されていることも確認できます。
Discussion