📘

NAT Gatewayの利用最適化: AWS Step Functionsで実現する効率的なワークフロー設計

2024/11/25に公開

はじめに

AWSでコスト最適化を検討する際、NAT Gatewayの運用は重要なポイントです。
本記事では、特定のタイミングだけNAT Gatewayを有効化し、不要時に削除する仕組みをAWS Step FunctionsとLambdaで実現する方法をご紹介します。

このアプローチは、リソースの利用頻度が低い場合や、コスト最適化を最優先したいユースケースに適しています。
一方で、常時利用が求められるシステムや、可用性が重視されるケースでは推奨されないため、ユースケースの特性を十分に理解したうえで採用をご検討ください。

本記事のユースケース

NAT Gatewayは、プライベートサブネット内のリソースがインターネットにアクセスする際に必要不可欠なAWSサービスです。
その高可用性と信頼性は、多くのワークロードで重要な役割を果たします。

一方で、NAT Gatewayは利用時間に関係なく一定の時間単位で料金が発生するため、利用頻度の低いユースケースではコスト最適化が課題となることがあります。

たとえば、データ処理やバッチ処理といったワークロードでは、インターネットアクセスが必要な時間が限られている場合があります。
このようなケースでは、NAT Gatewayを常時稼働させる設計が必ずしも適切とは限りません。
そこで、リソースの有効化と削除を自動化することで、コスト削減を図りながら必要なタイミングでアクセスを確保するという考え方も有効です。

本記事では、この課題を解決するために、AWS Step FunctionsとLambdaを活用してNAT Gatewayの作成、利用、削除を自動化する方法をご紹介します。
この仕組みを実装することで、より柔軟で効率的なリソース運用が可能になります。

事前準備

  • NAT Gatewayをデプロイするパブリックサブネット
  • プライベートサブネット
  • EIP (本記事では"lambda-eip"と命名)

本記事内に記載した参考コードは、各自のネットワーク構成に合わせて調整ください。

Step Functionsのワークフロー

以下の図は、NAT Gatewayの作成から削除までを自動化するステートマシンのワークフローを示しています。
各ステップの役割については後述します。

ステートマシン内では、3つのLambdaが動いています。
コード例は次のセクションで示します。ここでは挙動の解説を行います。

① Create Nat Gateway / Delete Nat Task (NAT Gatewayの作成と削除)

  • どちらも同じLambda関数を利用しています。渡された値により条件分岐します。
  • StepFunctionsのステートマシンがトリガーされると、NAT Gatewayを自動作成します。
  • NAT Gatewayは毎回同じリソース名・EIPを利用して立ち上がります。
  • NAT Gatewayが立ち上がったらルーティング設定も自動で行います。ステートマシンが起動中のみインターネット向けのルートが開通します。

(ステートマシンの中間処理・・・)

  • ステートマシンの中間処理では、データの変換や外部システムとのAPI連携を行うことが想定されます。
  • ステートマシン内での必要処理が終わったら、最後にルーティング設定とNAT Gatewayを削除します。

② CheckNatGatewayStatus (状態チェック)

  • 想定どおりにNAT Gateway作成やルーティング設定が行われているかチェック

③Run lambda Task (任意の処理実行)

  • 本記事用に用意したテストLambdaコード
  • こちらを必要な処理やサービスに置き換えてください。例えばプライベートサブネットにFargateタスクを起動させて実行するなどが考えられます。

コード例

① Lambda(Create Nat Gateway / Delete Nat Task)

import os
import boto3
from botocore.exceptions import ClientError

ec2 = boto3.client("ec2")

# 環境変数から取得
pub_sub_id = os.getenv('PUBLIC_SUB_ID')
private_sub1_id = os.getenv('PRIVATE_SUB1_ID')
private_sub2_id = os.getenv('PRIVATE_SUB2_ID')

def list_eip():
    try:
        res = ec2.describe_addresses(
            Filters=[{'Name': 'tag:Name', 'Values': ['lambda-eip']}]
        )
        if not res["Addresses"]:
            print("No EIP found with the tag 'lambda-eip'")
            return {"statusCode": 404, "body": "EIP not found"}
        return {"statusCode": 200, "body": res["Addresses"][0]["AllocationId"]}
    except ClientError as e:
        print(f"ClientError in list_eip: {e}")
        return {"statusCode": 500, "body": f"An error occurred: {e.response['Error']['Message']}"}

def start_ngw(eip, subnet_id):
    try:
        res = ec2.describe_nat_gateways(
            Filters=[{'Name': 'tag:Name', 'Values': ['lambda-ngw']}]
        )

        if not res["NatGateways"]:
            print("No existing NAT Gateway found. Creating...")
            ngw = ec2.create_nat_gateway(
                AllocationId=eip,
                SubnetId=subnet_id,
                TagSpecifications=[{
                    "ResourceType": "natgateway",
                    "Tags": [{"Key": "Name", "Value": "lambda-ngw"}],
                }],
            )
            nat_id = ngw["NatGateway"]["NatGatewayId"]
            ec2.get_waiter("nat_gateway_available").wait(NatGatewayIds=[nat_id])
            print(f"NAT Gateway created with ID: {nat_id}")
            return {"statusCode": 200, "body": nat_id}

        nat_gateway = res["NatGateways"][0]
        if nat_gateway["State"] == "available":
            print(f"Existing NAT Gateway available with ID: {nat_gateway['NatGatewayId']}")
            return {"statusCode": 500, "body": nat_gateway["NatGatewayId"]}
        
        else:
            # If NAT Gateway exists but not in available state, create a new NAT Gateway
            print("NAT Gateway exists but is not in 'available' state. Creating a new NAT Gateway...")
            ngw = ec2.create_nat_gateway(
                AllocationId=eip,
                SubnetId=subnet_id,
                TagSpecifications=[{
                    "ResourceType": "natgateway",
                    "Tags": [{"Key": "Name", "Value": "lambda-ngw"}],
                }],
            )
            nat_id = ngw["NatGateway"]["NatGatewayId"]
            ec2.get_waiter("nat_gateway_available").wait(NatGatewayIds=[nat_id])
            print(f"New NAT Gateway created with ID: {nat_id}")
            return {"statusCode": 200, "body": nat_id}

    except ClientError as e:
        print(f"ClientError in start_ngw: {e}")
        return {"statusCode": 500, "body": f"An error occurred: {e.response['Error']['Message']}"}


def attach_ngw_route(nat_gateway_id, subnet_id):
    try:
        response = ec2.describe_route_tables(
            Filters=[{'Name': 'association.subnet-id', 'Values': [subnet_id]}]
        )

        if not response["RouteTables"]:
            print(f"No route table found for subnet: {subnet_id}")
            return {"statusCode": 404, "body": f"No route table found for subnet {subnet_id}"}

        route_table_id = response["RouteTables"][0]["RouteTableId"]
        ec2.create_route(
            DestinationCidrBlock="0.0.0.0/0",
            NatGatewayId=nat_gateway_id,
            RouteTableId=route_table_id
        )
        print(f"Route to 0.0.0.0/0 added in route table {route_table_id} for NAT Gateway {nat_gateway_id}")
        return {"statusCode": 200, "body": f"Route added to route table {route_table_id}"}

    except ClientError as e:
        print(f"ClientError in attach_ngw_route for subnet {subnet_id}: {e}")
        return {"statusCode": 500, "body": f"An error occurred: {e.response['Error']['Message']}"}

def detach_ngw_route(subnet_id):
    try:
        response = ec2.describe_route_tables(
            Filters=[{'Name': 'association.subnet-id', 'Values': [subnet_id]}]
        )

        if not response["RouteTables"]:
            print(f"No route table found for subnet: {subnet_id}")
            return {"statusCode": 404, "body": f"No route table found for subnet {subnet_id}"}

        route_table_id = response["RouteTables"][0]["RouteTableId"]
        ec2.delete_route(DestinationCidrBlock="0.0.0.0/0", RouteTableId=route_table_id)
        print(f"Route to 0.0.0.0/0 removed from route table {route_table_id}")
        return {"statusCode": 200, "body": f"Route removed from route table {route_table_id}"}

    except ClientError as e:
        print(f"ClientError in detach_ngw_route for subnet {subnet_id}: {e}")
        return {"statusCode": 500, "body": f"An error occurred: {e.response['Error']['Message']}"}

def stop_ngw():
    try:
        res = ec2.describe_nat_gateways(
            Filters=[{'Name': 'tag:Name', 'Values': ['lambda-ngw']}, {'Name': 'state', 'Values': ['available']}]
        )

        if not res["NatGateways"]:
            print("No NAT Gateway found to delete.")
            return {"statusCode": 404, "body": "NAT Gateway not found"}

        nat_gateway_id = res["NatGateways"][0]["NatGatewayId"]
        ec2.delete_nat_gateway(NatGatewayId=nat_gateway_id)
        ec2.get_waiter("nat_gateway_deleted").wait(NatGatewayIds=[nat_gateway_id])
        print(f"NAT Gateway with ID {nat_gateway_id} deleted successfully.")
        return {"statusCode": 200, "body": f"NAT Gateway {nat_gateway_id} deleted successfully"}

    except ClientError as e:
        print(f"ClientError in stop_ngw: {e}")
        return {"statusCode": 500, "body": f"An error occurred: {e.response['Error']['Message']}"}

def lambda_handler(event, context):
    nat_type = event.get('Type')
    print(f"Operation requested: {nat_type}")

    if nat_type == "create-ngw":
        eip_response = list_eip()
        if eip_response["statusCode"] != 200:
            return eip_response

        nat_gateway_response = start_ngw(eip_response["body"], pub_sub_id)
        if nat_gateway_response["statusCode"] != 200:
            return nat_gateway_response

        # Attach routes to subnets
        for subnet_id in [private_sub1_id, private_sub2_id]:
            route_response = attach_ngw_route(nat_gateway_response["body"], subnet_id)
            if route_response["statusCode"] != 200:
                return route_response

    elif nat_type == "delete-ngw":
        for subnet_id in [private_sub1_id, private_sub2_id]:
            route_response = detach_ngw_route(subnet_id)
            if route_response["statusCode"] != 200:
                return route_response

        nat_gateway_response = stop_ngw()
        if nat_gateway_response["statusCode"] != 200:
            return nat_gateway_response

    else:
        print("Invalid operation type specified.")
        return {"statusCode": 400, "body": "Invalid operation type"}

    return {"statusCode": 200, "body": f"{nat_type} operation completed successfully"}

このコードは、AWS Lambdaを利用してNAT Gatewayの作成・削除を自動化するものです。
以下の主要な機能があります:

  • EIPの取得(list_eip)

  • NAT Gatewayの作成と状態管理(start_ngw)

    • 指定されたサブネットとEIPを使って、新しいNAT Gatewayを作成します。
    • 既存のNAT Gatewayが利用不可の場合は新しいNAT Gatewayを作成します。
  • ルートの追加・削除(attach_ngw_route / detach_ngw_route):

    • 指定されたサブネットのルートテーブルにインターネット接続のルートを追加・削除します。
  • NAT Gatewayの削除(stop_ngw)

  • Lambdaには3つの環境変数をセットします。
    • サブネットのIDを入力します。
  • またNAT Gatewayの作成に時間がかかるので、Timeoutの時間は余裕をもって設定ください。目安として5~6分あれば処理が通ります。

② Lambda(CheckNatGatewayStatus)

import boto3
import os
from botocore.exceptions import ClientError

ec2 = boto3.client('ec2')

def check_nat_gateway():
    """NAT Gatewayの存在を確認し、存在する場合はその状態を返す"""
    try:
        res = ec2.describe_nat_gateways(
            Filters=[
                {
                    'Name': 'tag:Name',
                    'Values': ['lambda-ngw']
                },
                {
                    "Name": "state",
                    "Values": ["available"]
                }
            ]
        )

        # NAT Gatewayが見つからない場合の対応
        if not res['NatGateways']:
            print("No NAT Gateway found with the specified tag and state.")
            return {
                "statusCode": 404,
                "body": "No available NAT Gateway found with the specified tag."
            }

        # NAT Gatewayが見つかった場合のステータスを返す
        nat_gateway_id = res['NatGateways'][0]['NatGatewayId']
        status = res['NatGateways'][0]['State']
        print(f"NAT Gateway {nat_gateway_id} is in state: {status}")
        
        return {
            "statusCode": 200,
            "body": {
                "status": status,
                "nat_gateway_id": nat_gateway_id
            }
        }

    except ClientError as e:
        print(f"An error occurred: {e}")
        return {
            "statusCode": 500,
            "body": f"An error occurred: {e.response['Error']['Message']}"
        }

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return {
            "statusCode": 500,
            "body": f"An unexpected error occurred: {str(e)}"
        }

def check_ngw_route_exists(nat_gateway_id, subnet_id):
    """特定のルートテーブルに0.0.0.0/0のルート(NAT Gateway向け)が存在するか確認し、存在しない場合はエラーを返す"""
    try:
        response = ec2.describe_route_tables(
            Filters=[
                {
                    'Name': 'association.subnet-id',
                    'Values': [subnet_id]
                }
            ]
        )

        if not response["RouteTables"]:
            print(f"No route table found for subnet: {subnet_id}")
            return {
                "statusCode": 404,
                "body": f"No route table found for subnet: {subnet_id}"
            }

        route_table_id = response["RouteTables"][0]["RouteTableId"]

        # 0.0.0.0/0のルートが既に存在するか確認
        route_exists = any(
            route.get("DestinationCidrBlock") == "0.0.0.0/0" and route.get("NatGatewayId") == nat_gateway_id
            for route in response["RouteTables"][0]["Routes"]
        )

        if route_exists:
            print(f"Route to 0.0.0.0/0 via NAT Gateway already exists in route table {route_table_id}")
            return {
                "statusCode": 200,
                "body": f"Route exists in route table {route_table_id}"
            }
        else:
            print(f"No route to 0.0.0.0/0 via NAT Gateway found in route table {route_table_id}")
            return {
                "statusCode": 404,
                "body": f"No route to 0.0.0.0/0 in route table {route_table_id}"
            }

    except ClientError as e:
        print(f"An error occurred: {e}")
        return {
            "statusCode": 500,
            "body": f"An error occurred: {e.response['Error']['Message']}"
        }

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return {
            "statusCode": 500,
            "body": f"An unexpected error occurred: {str(e)}"
        }

def lambda_handler(event, context):
    # NAT Gatewayの存在を確認
    nat_gateway_check = check_nat_gateway()
    if nat_gateway_check["statusCode"] != 200:
        return nat_gateway_check

    nat_gateway_id = nat_gateway_check["body"]["nat_gateway_id"]

    # サブネットIDを環境変数から取得
    subnet_ids = os.getenv("SUBNET_IDS", "").split(",")

    # 各サブネットに対してルートの存在を確認
    for subnet_id in subnet_ids:
        route_check = check_ngw_route_exists(nat_gateway_id, subnet_id)
        if route_check["statusCode"] != 200:
            return route_check

    return {
        "statusCode": 200,
        "body": "All routes are verified successfully."
    }
  • このコードは、NAT Gatewayとサブネットのルート設定が正しいことを検証するユーティリティとして利用できます。
  • NAT Gatewayの存在を確認し、利用可能な場合に各サブネットに対してルートの存在を確認します。

  • Lambdaの環境変数に、プライベートサブネットのIDを入力ください。本記事では2つのサブネットIDをコンマ区切りで入力する形にしています。

③Lambda(Run lambda Task)

import json

def lambda_handler(event, context):
    # TODO implement
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }
  • 上記コードはテスト用です。
  • 必要な処理やサービスに置き換えてください。

StepFunctions ステートマシン JSON

{
  "Comment": "NAT Gateway Creation Workflow with Simplified Error Handling",
  "StartAt": "Create NAT Gateway",
  "States": {
    "Create NAT Gateway": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:xxxxxx",
      "Parameters": {
        "Type": "create-ngw"
      },
      "Next": "CheckNATGatewayResponse"
    },
    "CheckNATGatewayResponse": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.statusCode",
          "NumericEquals": 200,
          "Next": "WaitForNATGateway"
        }
      ],
      "Default": "FailState"
    },
    "WaitForNATGateway": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "CheckNATGatewayStatus"
    },
    "CheckNATGatewayStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:xxxxxx",
      "ResultPath": "$",
      "Next": "NATGatewayReady?",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "FailState"
        }
      ]
    },
    "NATGatewayReady?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.statusCode",
          "NumericEquals": 200,
          "Next": "Run lambda Task"
        }
      ],
      "Default": "FailState"
    },
    "Run lambda Task": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:xxxxxx",
      "Next": "Delete Nat Task"
    },
    "Delete Nat Task": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:xxxxxx",
      "Parameters": {
        "Type": "delete-ngw"
      },
      "End": true
    },
    "FailState": {
      "Type": "Fail",
      "Error": "NATGatewayCreationFailed",
      "Cause": "NAT Gateway did not reach the ready state within the expected time or an error occurred."
    }
  }
}
  • 最後にStepFunctionsワークフローです。
  • 各Lambdaを自身の環境のLambda ARNに置き換えてください(xxxマークがある箇所)
  • Run Lambda Taskのステップを別サービスに置き換えたり、いくつかのステップを追加するなど、好きにカスタマイズしてください。

動作確認

  • ステートマシンにトリガーをかけると、NAT Gatewayの自動起動が始まります。
  • lambda-ngwという名前でNAT Gatewayのデプロイが行われます。

  • NAT Gateway作成完了後、ルーティング設定も自動で行われます。

  • NAT Gateway作成やルーティング設定が想定通りに進めば、「Run lambda Task」の処理が行われます。
  • このステップで好みの処理を行うイメージです。

  • 最後にNAT Gatewayを自動削除します。

  • このように必要なときのみNAT Gateway作成+ルーティング設定がなされます。
  • EventBridgeと連携したサーバレスワークフローとすることも可能です。

おわりに

本記事で紹介した仕組みは、特定の条件下でNAT Gatewayの運用コストを抑えるための一例です。
しかし、可用性や信頼性が求められる場合には、常時稼働の設計が適切です。

AWSはさまざまなユースケースに対応可能な柔軟なサービスを提供しています。
利用シナリオに応じて、最適な設計を御検討ください。

最後までお読みいただき ありがとうございました!

MEGAZONE株式会社 Tech Blog

Discussion