💽

DynamoDBストリームをLambda x Golangで受けとる方法

2020/10/30に公開

実際のコードは以下に掲載していますので、ご自由にお使いください。
yu-croco/DynamodbStreamGolangLambda

経緯

DynamoDBで発生したデータ操作(INSERT/MODIFY/REMOVE)をトリガーにしてLambda(Golang)を動かしたいケースが有ったので、その際に調べたことをまとめました。

方針

DynamoDBへのデータ操作をトリガーにするには、DynamoDBストリームを使います。DynamoDBストリームの詳細に関しては以下が参考になると思います。

また、DynamoDBストリームによって受け渡されるデータ型はLambda側ではevents.DynamoDBEventとして受け取れます。
詳細はいかが参考人あると思います。

実装

events.DynamoDBEvent を受け取ると、以下のようなデータ構造を受け取れます(雰囲気を捉えやすいようにJSONで書いています)。
*JSONはチュートリアル: Amazon DynamoDB ストリームで AWS Lambda を使用するより拝借

{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}

events.DynamoDBEvent でGolangの型としてすんなり取得できるのはNewImage/OldImageまでです。

それより中身は map[string]events.DynamoDBAttributeValue という型で表現されているので、ここから自分たちで用意したstructに変換する必要があります(必須ではないですが、このままデータ操作を行うのはかなり辛そう...)。

func Handler(request events.DynamoDBEvent) error {
	for _, record := range request.Records {
		// 型は map[string]events.DynamoDBAttributeValue
		newImage := record.Change.NewImage 
		oldImage := record.Change.OldImage
		...
	}
	...
}

map[string]events.DynamoDBAttributeValue をstructに変換するのはチョット面倒です。
方法としては、map[string]events.DynamoDBAttributeValuedynamodb.AttributeValue に変換した上で、 dynamodbattribute.UnmarshalMap で変換する感じです。

コードとしては以下のような感じです。

// events.DynamoDBEventをほぼそのままstructに変換する感じを想定
type DynamodbStream struct {
	Records Records `json:"Records"`
}

type Records []Record

type Record struct {
	Dynamodb  DynamoDBImages `json:"dynamodb"`
	EventName string         `json:"eventName"`
}

type DynamoDBImages struct {
	NewImage RecordImage `json:"NewImage"`
	OldImage RecordImage `json:"OldImage"`
}

type RecordImage struct {
	UserId int `json:"userId" dynamodbav:"userId"`
	Age    int `json:"age" dynamodbav:"age"`
	Address string `json:"address" dynamodbav:"address"`
}

// map[string]events.DynamoDBAttributeValueを
// map[string]*dynamodb.AttributeValueに変換して
// structに変換してる
func unmarshalStreamImage(attribute map[string]events.DynamoDBAttributeValue) (model.RecordImage, error) {
	dbAttrMap := make(map[string]*dynamodb.AttributeValue)
	var image model.RecordImage

	for idx, value := range attribute {
		var dbAttr dynamodb.AttributeValue

		bytes, marshalErr := value.MarshalJSON(); if marshalErr != nil {
			return image, marshalErr
		}

		if unmarshalErr := json.Unmarshal(bytes, &dbAttr); unmarshalErr != nil {
			return image, unmarshalErr
		}

		dbAttrMap[idx] = &dbAttr
	}

	if unmarshalErr := dynamodbattribute.UnmarshalMap(dbAttrMap, &image); unmarshalErr != nil {
		return image, unmarshalErr
	}

	return image, nil
}

func RequestConvert(event events.DynamoDBEvent) (*model.DynamodbStream, error) {
	var records model.Records
	for _, e := range event.Records {
		newImage, newImageErr := unmarshalStreamImage(e.Change.NewImage)
		if newImageErr != nil {
			return nil, newImageErr
		}

		oldImage, oldImageErr := unmarshalStreamImage(e.Change.OldImage)
		if oldImageErr != nil {
			return nil, oldImageErr
		}

		record := model.Record{
			Dynamodb: model.DynamoDBImages{
				NewImage: newImage,
				OldImage: oldImage,
			},
			EventName: e.EventName,
		}
		records = append(records, record)
	}

	events := model.DynamodbStream{Records: records}

	return &events, nil
}

ちょっと面倒ですがこれでDynamoDBストリームから流れてきたデータを変換できます。

Discussion