📝

AWS SAMとEventBridgeを使用したLambda関数の定期実行

2023/07/17に公開

はじめに

AWS SAMを使用して、Lambdaを定期的に実行するためにEventBridgeを利用しました。
SAMのテンプレート(template.yaml)にEventBridgeのルールを定義するだけで、Lambdaを定期的に実行することが可能です。

今回は、LambdaをGo言語で実装しました。
後ほど具体的な実装に触れますが、シンプルに文字列を表示し、定期的に動作していることが一目でわかるようにしています。
最後に、おまけとしてLambdaの実装について紹介します。この実装では、bitflyerが公開しているAPIを使用して仮想通貨の販売価格をS3に保存する内容になっています。

準備


SAMのセットアップが完了したら、テンプレートにEventBridgeのルールを定義します。
その後、SAMをデプロイすると、テンプレートからEventBridgeが作成され、Lambdaが定期的に実行されるようになります。

AWS SAMの準備

SAMのセットアップ行います。
参考になる記事「LambdaとGolangで始めるサーバレス開発」には、セットアップ手順が記載されていますので、そちらをご参照ください。

EventBridge スケジューライベントの作成

template.yamlのEventsプロパティを下記のように編集します。

template.yaml
Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello-world/
      Handler: hello-world
      Runtime: go1.x
      Architectures:
        - x86_64
      Events:
        ScheduledFunction:
          Type: Schedule
          Properties:
            Schedule: cron(0/5 * * * ? *)
            Enabled: true
            Name: TestSchedule
            Description: test schedule
template.yamlの全体
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: |
  sam-example-cron
  Sample SAM Template for sam-example-cron
Globals:
  Function:
    Timeout: 5
    MemorySize: 128
Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello-world/
      Handler: hello-world
      Runtime: go1.x
      Architectures:
        - x86_64
      Events:
        ScheduledFunction:
          Type: Schedule
          Properties:
            Schedule: cron(0/5 * * * ? *)
            Enabled: true
            Name: TestSchedule
            Description: test schedule
      Environment:
        Variables:
          PARAM1: VALUE
  ApplicationResourceGroup:
    Type: AWS::ResourceGroups::Group
    Properties:
      Name: !Sub ApplicationInsights-SAM-${AWS::StackName}
      ResourceQuery:
        Type: CLOUDFORMATION_STACK_1_0
  ApplicationInsightsMonitoring:
    Type: AWS::ApplicationInsights::Application
    Properties:
      ResourceGroupName: !Ref ApplicationResourceGroup
      AutoConfigurationEnabled: 'true'

EventBridgeのスケジュールのプロパティは数多くありますが、最低限必要なものを定義しています。5分ごとにLambdaを実行するように設定しています。公式では、他のプロパティも紹介されていますので、気になる方は参照してください。

TypeでScheduleとしていますが、ScheduleV2も利用可能です。
ScheduleはUTCに基づいて動作し、ScheduleV2はタイムゾーンに依存します。
ScheduleV2では、プロパティScheduleExpressionTimezoneを使用して、スケジュール式の評価に使用するタイムゾーンを設定できます。

https://qiita.com/baku2san/items/2514ce485731ff7068c5
https://docs.aws.amazon.com/ja_jp/serverless-application-model/latest/developerguide/sam-property-function-schedulev2.html

Lambadaの作成

main.go
package main

import (
	"fmt"

	"github.com/aws/aws-lambda-go/lambda"
)

func handler()  {
	fmt.Println("Hello")
}

func main() {
	lambda.Start(handler)
}

定期実行する

SAMをデプロイします。

sam deploy --guided

質問に対する回答はこちらの記事を参考にしてください。

デプロイが成功すると、Lambda関数の詳細画面にアクセスすると、EventBridgeがトリガーとして設定されて動作していることを確認できます。

結果の確認

ログが定期的に出力されているかを確認するために、CloudWatchのコンソール画面にアクセスします。

5分ごとにログが出力されていることを確認できました!

おまけ

これまで、ログをCloudWatchに出力していましたが、少し変更して、定期実行の結果をS3に保存するようにしてみます。また、単純に現在時刻を保存するだけでは実装が単純すぎるため、以前に作成した記事の「LambdaとGolangで始めるサーバレス開発」を使用して仮想通貨の販売価格をJSON形式でS3に保存するように実装します。

S3の準備

S3バケットを作成し、そのバケット名をAWS Systems Manager パラメータストアに設定する方法について説明しています。

S3バケットの作成

S3バケットを作成します。

aws s3 mb s3://[バケット名]

S3のバケット名をAWS Systems Manager パラメータストアに設定

実装において、バケット名の管理方法はさまざまありますが、今回はAWS Systems Managerのパラメータストアを使用して管理し、値を取得するようにします。

具体的な設定方法については説明しませんが、以下の記事が参考になりました。
https://infraya.work/posts/go_fetch_parameterstore/

AWS SAMでのアプリケーションのテンプレート設定と名称変更の手順

最初に、AWS SAMのセットアップ時にアプリケーションのテンプレートを「Hello World Example」に設定したため、フォルダやファイル名などがデフォルトの「hello-world」となっています。今回はbitflyerからデータを取得する実装を行うため、これらの名称を「bitflyer」に変更します。
※ここでは任意の名前に変更していただいても問題ありませんが、後述で登場する「bitflyer」の部分を変更した名前に置き換えてください。

  1. template.yamlの変更
template.yaml
+ BitflyerFunction:
- HelloWorldFunction:

Properties:
+  CodeUri: bitflyer/
+  Handler: bitflyer
-  CodeUri: hello-world/
-  Handler: hello-world
   Runtime: go1.x
  1. 作成プロジェクト直下のhello-worldフォルダ名を変更
    hello-world => bitflyer

  2. go.modのモジュール名を変更

bitflyer/go.mod
+ module bitflyer
- module hello-world

実装

構成のイメージです。

|--bitflyer
|  |--api
|  |  |--market_api.go
|  |--awslib
|  |  |--s3.go
|  |  |--systemManager.go
|  |--go.mod
|  |--go.sum
|  |--main.go
|  |--utils
|  |  |--http.utils.go

エントリーポイント

main.goの実装
main.go
package main

import (
	"bitflyer/api"
	"bitflyer/awslib"
	"bitflyer/utils"
	"encoding/json"
	"fmt"

	"github.com/aws/aws-lambda-go/lambda"
)

func handler() {
	// 各種通貨のコードを取得
	markets, err := api.GetMarkets()
	if err != nil {
		fmt.Printf("Error getting marks: %s\n", err)
		return
	}

	var tickers []api.Ticker
	for _, market := range markets {
		// 通貨ごとのTickerを取得
		ticker, err := api.GetTicker(market.ProductCode)
		if err != nil {
			fmt.Printf("Error getting ticker: %s\n", err)
			return
		}
		tickers = append(tickers, *ticker)
	}

	jsonTickers, err := json.Marshal(tickers)
	if err != nil {
		fmt.Printf("Error marshaling ticker data: %s\n", err)
		return
	}

	// フォルダとオブジェクトキーの作成
	now := utils.GetTimeNow()
	folderPath := fmt.Sprintf("%d/%02d/%02d", now.Year(), now.Month(), now.Day())            // S3への保存パス
	objectKey := folderPath + fmt.Sprintf("/%02d%02d_ticker.json", now.Hour(), now.Minute()) // S3への保存ファイル名

	// バケット名の取得
	bucketName, err := awslib.GetParameterStore("bitflyer-s3-bucket")
	if err != nil {
		fmt.Printf("Error getting parameter store: %s\n", err)
		return
	}

	err = awslib.SaveToS3(jsonTickers, bucketName, objectKey)
	if err != nil {
		fmt.Printf("Error saving ticker data to S3: %s\n", err)
		return
	}

	fmt.Println("Ticker data saved to S3 successfully.")
}

func main() {
	lambda.Start(handler)
}

bitflyer提供のAPI呼び出し

market_api.goの実装
api/market_api.go
package api

import (
	"bitflyer/utils"
	"encoding/json"
)

type Market struct {
	ProductCode string `json:"product_code"`
	MarketType  string `json:"market_type"`
}

type Ticker struct {
	ProductCode     string  `json:"product_code"`
	State           string  `json:"state"`
	TimeStamp       string  `json:"timestamp"`
	TickID          int     `json:"tick_id"`
	BestBid         float64 `json:"best_bid"`
	BestAsk         float64 `json:"best_ask"`
	BestBidSize     float64 `json:"best_bid_size"`
	BestAskSize     float64 `json:"best_ask_size"`
	TotalBidDepth   float64 `json:"total_bid_depth"`
	TotalAskDepth   float64 `json:"total_ask_depth"`
	Ltp             float64 `json:"ltp"`
	Volume          float64 `json:"volume"`
	VolumeByProduct float64 `json:"volume_by_product"`
}

var (
	// マーケット一覧の取得URL
	MarketsURL = "https://api.bitflyer.com//v1/markets"

	// Tickerの取得URL
	TickerURL = "https://api.bitflyer.com/v1/ticker"
)

// マーケット一覧を取得します
func GetMarkets() ([]Market, error) {
	res, err := utils.HttpRequest("GET", MarketsURL, map[string]string{})
	if err != nil {
		return nil, err
	}

	var markets []Market
	err = json.Unmarshal(res, &markets)
	if err != nil {
		return nil, err
	}
	return markets, nil
}

// Ticker情報を取得します
func GetTicker(code string) (*Ticker, error) {
	res, err := utils.HttpRequest("GET", TickerURL, map[string]string{"product_code": code})
	if err != nil {
		return &Ticker{}, err
	}

	var ticker Ticker
	err = json.Unmarshal(res, &ticker)
	if err != nil {
		return &Ticker{}, err
	}
	return &ticker, nil
}

S3へのデータ保存

s3.goの実装
awslib/s3.go
package awslib

import (
	"bytes"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
)

// S3へのデータ保存
func SaveToS3(data []byte, bucketName, objectKey string) error {
	// AWSセッションの作成
	sess, err := session.NewSession(&aws.Config{
		Region: aws.String("ap-northeast-1"),
	})
	if err != nil {
		return err
	}

	// S3サービスクライアントの作成
	s3Client := s3.New(sess)

	// S3オブジェクトの作成とデータのアップロード
	_, err = s3Client.PutObject(&s3.PutObjectInput{
		Body:   aws.ReadSeekCloser(bytes.NewReader(data)),
		Bucket: aws.String(bucketName),
		Key:    aws.String(objectKey),
	})
	if err != nil {
		return err
	}

	return nil
}

AWS System Managerからパラメータストアを取得

systemManager.goの実装
bitflyer/systemManager.go
package awslib

import (
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/ssm"
)

// System Managerからパラメータストアを取得する
func GetParameterStore(key string) (string, error) {
	sess := session.Must(session.NewSession())
	svc := ssm.New(
		sess,
		aws.NewConfig().WithRegion("ap-northeast-1"),
	)

	res, err := svc.GetParameter(&ssm.GetParameterInput{
		Name:           aws.String(key),
		WithDecryption: aws.Bool(true),
	})
	if err != nil {
		return "", err
	}

	return *res.Parameter.Value, nil
}

httpリクエスト処理

http.utils.go
bitflyer/http.utils.go
package utils

import (
	"io/ioutil"
	"net/http"
	"time"
)

func HttpRequest(method, url string, query map[string]string) ([]byte, error) {
	req, err := http.NewRequest(method, url, nil)
	if err != nil {
		return nil, err
	}

	q := req.URL.Query()
	for key, val := range query {
		q.Add(key, val)
	}
	req.URL.RawQuery = q.Encode()

	client := new(http.Client)
	res, err := client.Do(req)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return nil, err
	}
	return body, nil
}

func GetTimeNow() time.Time {
	// タイムゾーンを日本時間に設定
	jst := time.FixedZone("Asia/Tokyo", 9*60*60)

	// 日本の現在時間を取得
	now := time.Now().In(jst)
	return now
}

実行結果

実装が完了し、再度「sam build」と「sam deploy」コマンドを実行すると、5分ごとにJSONファイルがS3バケットに出力されていることを確認できます。

JSONファイルの中身が正しく出力されていることも確認できます。

株式会社ROBONの技術ブログ

Discussion