Chapter 12

Lambda EventBridge(CloudWatchEvent) boto3

hello_yogurt
hello_yogurt
2022.07.23に更新

LambdaでEventBridge(CloudWatchEvent)をboto3を使って作成する

流れ
Cloudwatch_event.vue
↓ EventBridgeの設定を送信している(イベント名、cron、有効無効)

views.py
class CloudwatchEventView(View):
def post(self, request):
↓ views.pyではEventBridgeの設定をmodelに保存している

lambda test.py

test.py
import json
import boto3

def lambda_handler(event, context):
    item_id               = event['item_id']
    item_name             = event['item_name']
    item_url              = event['item_url']
    username               = event['username']
    cloudwatch_event_name = event['cloudwatch_event_name']
    minute                = event['minute']
    hour                  = event['hour']
    day                   = event['day']
    weekday               = event['weekday']
    state                 = event['state']
    
    print('item_id', item_id, sep="\n")
    print('event', event, sep="\n")
    
    if type(minute) == int:
        minute = str(minute)
        
    if type(hour) == int:
        hour = str(hour)
        
    if type(day) == int:
        day = str(day)
        
    if day == '*' and weekday == '*':
        weekday = '?'
        
    if state == "有効":
        state = "ENABLED"
    else:
        state = "DISABLED"

    client = boto3.client('events')

    event_name = cloudwatch_event_name
    event_time = "cron("+minute+" "+hour+" "+day+" * "+weekday+" *)"
    description = str(item_id)

    response = client.put_rule(
                        Name=event_name,
                        ScheduleExpression= event_time,
                        EventPattern='',
                        State=state,
                        Description=description)

    lambda_name = 'test_post'
    lambda_arn = 'arn:aws:lambda:ap-northeast-1:aws-id:function:test_post'
    params = {
        "item_id"               : item_id,
        "item_name"             : item_name,
        "item_url"              : item_url,
        "username"              : username,
        }
        
    params = json.dumps(params)
        
    response = client.put_targets(
        Rule=event_name,
        Targets=[
            {
                'Id': lambda_name,
                'Arn':  lambda_arn,
                'Input': params,
            }
        ]
    )


    return {
        'statusCode': 200,
        'headers': {
            'Access-Control-Allow-Headers': '*',
            'Access-Control-Allow-Origin': 'http://ドメイン',
            'Access-Control-Allow-Methods': 'OPTIONS,POST',
        },
        'body': json.dumps('Hello from Lambda!'),
        'item_id': item_id
        }

ApiGateway POST 総合リクエスト マッピングテンプレート
Content-Type
application/json

{
  "item_id"               : $input.json("$.item_id"),
  "item_name"             : $input.json("$.item_name"),
  "item_url"              : $input.json("$.item_url"),
  "username"              : $input.json("$.username"),
  "cloudwatch_event_name" : $input.json("$.cloudwatch_event_name"),
  "minute"                : $input.json("$.minute"),
  "hour"                  : $input.json("$.hour"),
  "day"                   : $input.json("$.day"),
  "weekday"               : $input.json("$.weekday"),
  "state"                 : $input.json("$.state")
}


これでlambda(test.py)がbackendのviews.pyから値を受けることができる


EventBrigeのTargetをlambda(test_post.py)にしている

ちなみに、lambda(test_post.py)にトリガーとしてEventBrigeを追加しないと動かない