DynamoDBストリームをLambda x Golangで受けとる方法
実際のコードは以下に掲載していますので、ご自由にお使いください。
yu-croco/DynamodbStreamGolangLambda
経緯
DynamoDBで発生したデータ操作(INSERT/MODIFY/REMOVE)をトリガーにしてLambda(Golang)を動かしたいケースが有ったので、その際に調べたことをまとめました。
方針
DynamoDBへのデータ操作をトリガーにするには、DynamoDBストリームを使います。DynamoDBストリームの詳細に関しては以下が参考になると思います。
また、DynamoDBストリームによって受け渡されるデータ型はLambda側ではevents.DynamoDBEvent
として受け取れます。
詳細はいかが参考人あると思います。
実装
events.DynamoDBEvent
を受け取ると、以下のようなデータ構造を受け取れます(雰囲気を捉えやすいようにJSONで書いています)。
*JSONはチュートリアル: Amazon DynamoDB ストリームで AWS Lambda を使用するより拝借
{
"Records":[
{
"eventID":"1",
"eventName":"INSERT",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"111",
"SizeBytes":26,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
}
]
}
events.DynamoDBEvent
でGolangの型としてすんなり取得できるのはNewImage
/OldImage
までです。
それより中身は map[string]events.DynamoDBAttributeValue
という型で表現されているので、ここから自分たちで用意したstructに変換する必要があります(必須ではないですが、このままデータ操作を行うのはかなり辛そう...)。
func Handler(request events.DynamoDBEvent) error {
for _, record := range request.Records {
// 型は map[string]events.DynamoDBAttributeValue
newImage := record.Change.NewImage
oldImage := record.Change.OldImage
...
}
...
}
map[string]events.DynamoDBAttributeValue
をstructに変換するのはチョット面倒です。
方法としては、map[string]events.DynamoDBAttributeValue
を dynamodb.AttributeValue
に変換した上で、 dynamodbattribute.UnmarshalMap
で変換する感じです。
コードとしては以下のような感じです。
// events.DynamoDBEventをほぼそのままstructに変換する感じを想定
type DynamodbStream struct {
Records Records `json:"Records"`
}
type Records []Record
type Record struct {
Dynamodb DynamoDBImages `json:"dynamodb"`
EventName string `json:"eventName"`
}
type DynamoDBImages struct {
NewImage RecordImage `json:"NewImage"`
OldImage RecordImage `json:"OldImage"`
}
type RecordImage struct {
UserId int `json:"userId" dynamodbav:"userId"`
Age int `json:"age" dynamodbav:"age"`
Address string `json:"address" dynamodbav:"address"`
}
// map[string]events.DynamoDBAttributeValueを
// map[string]*dynamodb.AttributeValueに変換して
// structに変換してる
func unmarshalStreamImage(attribute map[string]events.DynamoDBAttributeValue) (model.RecordImage, error) {
dbAttrMap := make(map[string]*dynamodb.AttributeValue)
var image model.RecordImage
for idx, value := range attribute {
var dbAttr dynamodb.AttributeValue
bytes, marshalErr := value.MarshalJSON(); if marshalErr != nil {
return image, marshalErr
}
if unmarshalErr := json.Unmarshal(bytes, &dbAttr); unmarshalErr != nil {
return image, unmarshalErr
}
dbAttrMap[idx] = &dbAttr
}
if unmarshalErr := dynamodbattribute.UnmarshalMap(dbAttrMap, &image); unmarshalErr != nil {
return image, unmarshalErr
}
return image, nil
}
func RequestConvert(event events.DynamoDBEvent) (*model.DynamodbStream, error) {
var records model.Records
for _, e := range event.Records {
newImage, newImageErr := unmarshalStreamImage(e.Change.NewImage)
if newImageErr != nil {
return nil, newImageErr
}
oldImage, oldImageErr := unmarshalStreamImage(e.Change.OldImage)
if oldImageErr != nil {
return nil, oldImageErr
}
record := model.Record{
Dynamodb: model.DynamoDBImages{
NewImage: newImage,
OldImage: oldImage,
},
EventName: e.EventName,
}
records = append(records, record)
}
events := model.DynamodbStream{Records: records}
return &events, nil
}
ちょっと面倒ですがこれでDynamoDBストリームから流れてきたデータを変換できます。
Discussion