🚀

Lambda Durable Functionsを使って自動運用をよりスマートにしましょう

に公開

はじめに

2025年12月のAWS re:Inventで、待望の新機能「Lambda Durable Functions」が発表されました。これにより、従来のLambdaの15分実行制限を超えて、最大1年間のワークフローを単一のLambda関数で実行できるようになります。

従来、人間の承認待ちや長時間の処理が必要なワークフローでは、Step Functionsや複数のLambdaを組み合わせる必要がありました。Durable Functionsを使えば、シンプルなPythonコードだけでこれらを実現できます。

本記事では、実践的なユースケースとして「未使用EBSボリュームの自動クリーンアップ」を例に、Durable Functionsの魅力を解説します。

Lambda Durable Functionsとは

主な特徴

機能 説明
チェックポイント 各ステップの結果を自動保存。障害時も途中から再開
最大1年間の実行 人間の承認待ちなど、長期間のワークフローに対応
待機中は課金なし context.wait()中はコンピュート課金が発生しない
シンプルなコード Step Functionsのような複雑なJSON定義は不要

対応言語

  • Python: 3.13 / 3.14
  • JavaScript/TypeScript: Node.js 22 / 24

仕組み(チェックポイント/リプレイ)

Durable Functionsはチェックポイント/リプレイ方式で動作します:

  1. step()実行後、結果がチェックポイントとして保存される
  2. 関数が中断・再開されると、コードは最初から実行される
  3. しかし、完了済みのステップはスキップされ、保存済みの結果が返される
  4. これにより、障害時も確実に途中から再開できる
┌─────────────────────────────────────────────────────────────────┐
│                    Durable Execution                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  初回実行                         再開時(リプレイ)             │
│  ─────────                       ──────────────────             │
│                                                                 │
│  ┌─────────────┐                 ┌─────────────┐               │
│  │  Step 1     │ ──実行──→      │  Step 1     │ ──スキップ    │
│  └──────┬──────┘   ↓保存         └──────┬──────┘   (結果復元) │
│         ↓                               ↓                       │
│  ┌─────────────┐                 ┌─────────────┐               │
│  │  wait()     │ ──一時停止→    │  wait()     │ ──スキップ    │
│  └──────┬──────┘   (課金なし)   └──────┬──────┘               │
│         ↓                               ↓                       │
│  ┌─────────────┐                 ┌─────────────┐               │
│  │  Step 2     │                 │  Step 2     │ ──ここから実行│
│  └─────────────┘                 └─────────────┘               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

主要なAPI

context.step() - チェックポイント付き実行

ビジネスロジックを実行し、結果を自動的にチェックポイントとして保存します。

# 完了後、結果が保存される。再実行時はスキップされる
result = context.step(
    lambda _: call_external_api(),
    name='call-api'
)

context.wait() - コスト0の待機

指定時間待機します。待機中はコンピュート課金が発生しません

# 1時間待機
context.wait(3600)

context.wait_for_callback() - 外部イベント待機

外部システム(Slack承認、Webhook等)からのコールバックを待機します。

approval = context.wait_for_callback(
    lambda callback_id, ctx: send_slack_notification(callback_id),
    name='wait-approval',
    config=WaitForCallbackConfig(timeout=Duration.from_days(7))  # 7日間待機
)
# 結果はJSON文字列として返されるため、json.loads()でパースが必要

context.parallel() / context.map() - 並列実行

複数の処理を並列実行します。

results = context.parallel([
    lambda ctx: ctx.step(task_a, name='task-a'),
    lambda ctx: ctx.step(task_b, name='task-b'),
], name='parallel-tasks')

実践例: 未使用EBS自動クリーンアップ

ユースケース

AWSを長期間運用していると、EC2インスタンスを削除してもEBSボリュームが残り続けることがあります。これらの「孤児EBS」は無駄なコストの原因になります。

従来の課題:

  • 定期的に手動でチェックする必要がある
  • 誤って重要なボリュームを削除するリスク
  • 承認フローを組み込むにはStep Functionsが必要

Durable Functionsでの解決:

  • 定期スキャン → Slack通知 → 承認待機 → 自動削除を1つのLambda関数で実現

アーキテクチャ

┌──────────────────────────────────────────────────────────────────────┐
│                         AWS Account                                  │
├──────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────┐     ┌──────────────────────────────────────────┐   │
│  │ EventBridge │────▶│     Lambda (Durable Function)            │   │
│  │ 毎週月曜9:00 │     │                                          │   │
│  └─────────────┘     │  ┌────────────────────────────────────┐  │   │
│                      │  │ Step 1: 未使用EBSスキャン          │  │   │
│                      │  └─────────────┬──────────────────────┘  │   │
│  ┌─────────────┐     │                ↓                         │   │
│  │    Slack    │◀────│  ┌────────────────────────────────────┐  │   │
│  │             │     │  │ Step 2: Slack通知 + 承認待機       │  │   │
│  └──────┬──────┘     │  └─────────────┬──────────────────────┘  │   │
│         │            │                ↓                         │   │
│         │            │  ┌────────────────────────────────────┐  │   │
│  ┌──────▼──────┐     │  │ Step 3: 承認時 → EBS削除           │  │   │
│  │  Callback   │────▶│  └────────────────────────────────────┘  │   │
│  │  Handler    │     │                                          │   │
│  └─────────────┘     └──────────────────────────────────────────┘   │
│                                                                      │
└──────────────────────────────────────────────────────────────────────┘

ファイル構成

コードは責務ごとに4つのファイルに分割しています。2つのLambda関数は同じCodeUriを共有しつつ、異なるHandlerを指定します。

aws-lambda-durable-functions/
├── main.py                # ワークフロー本体(lambda_handler)
├── slack_handler.py       # Slackコールバック(lambda_handler)
├── ebs.py                 # EBS操作(scan, delete)
├── slack.py               # Slack通知(send_notification, send_completion_notification)
├── template.yaml
├── requirements.txt
└── samconfig.toml

実装コード

EBS操作モジュール(ebs.py)

EBSボリュームのスキャンと削除を担当します。

import boto3

ec2 = boto3.client('ec2')


def scan_unused_volumes(_=None):
    """アタッチされていないEBSボリュームを検索"""
    response = ec2.describe_volumes(
        Filters=[{'Name': 'status', 'Values': ['available']}]
    )

    unused_volumes = []
    for volume in response['Volumes']:
        name = 'N/A'
        for tag in volume.get('Tags', []):
            if tag['Key'] == 'Name':
                name = tag['Value']
                break

        unused_volumes.append({
            'id': volume['VolumeId'],
            'size': volume['Size'],
            'name': name,
            'created': volume['CreateTime'].isoformat(),
            'az': volume['AvailabilityZone'],
            'volume_type': volume['VolumeType']
        })

    return unused_volumes


def delete_volumes(volumes):
    """EBSボリュームを削除"""
    deleted = []
    for volume in volumes:
        try:
            ec2.delete_volume(VolumeId=volume['id'])
            deleted.append(volume['id'])
            print(f"Deleted volume: {volume['id']}")
        except Exception as e:
            print(f"Failed to delete {volume['id']}: {e}")
    return deleted

Slack通知モジュール(slack.py)

Slack Block Kit通知の送信と完了通知を担当します。
検証のためSlack Bot Tokenは環境変数から取得していますが、ちゃんとした運用ではSecrets Manager等を利用してください。

import os
import json
import urllib.request
from datetime import datetime, timedelta

SLACK_BOT_TOKEN = os.environ.get('SLACK_BOT_TOKEN')
SLACK_CHANNEL = os.environ.get('SLACK_CHANNEL')
WAIT_DAYS = int(os.environ.get('WAIT_DAYS', 7))


def send_notification(volumes, callback_id):
    """Slack Block Kitメッセージを送信"""
    volume_list = "\n".join([
        f"• `{v['id']}` ({v['size']}GB, {v['volume_type']}) - {v['name']}"
        for v in volumes
    ])

    total_size = sum(v['size'] for v in volumes)
    estimated_monthly_cost = total_size * 0.08

    blocks = [
        {
            "type": "header",
            "text": {"type": "plain_text", "text": "🔍 未使用EBSボリュームが見つかりました"}
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": (
                    f"*{len(volumes)}個*の未使用EBSボリュームを検出しました\n"
                    f"合計サイズ: *{total_size}GB*\n"
                    f"推定月額コスト: *${estimated_monthly_cost:.2f}*"
                )
            }
        },
        {"type": "divider"},
        {
            "type": "section",
            "text": {"type": "mrkdwn", "text": f"*対象ボリューム:*\n{volume_list}"}
        },
        {
            "type": "context",
            "elements": [{
                "type": "mrkdwn",
                "text": f"⏰ 承認期限: {(datetime.now() + timedelta(days=WAIT_DAYS)).strftime('%Y-%m-%d %H:%M')}"
            }]
        },
        {
            "type": "actions",
            "elements": [
                {
                    "type": "button",
                    "text": {"type": "plain_text", "text": "✅ 削除を承認"},
                    "style": "primary",
                    "action_id": "approve_delete",
                    "value": callback_id
                },
                {
                    "type": "button",
                    "text": {"type": "plain_text", "text": "❌ キャンセル"},
                    "style": "danger",
                    "action_id": "reject_delete",
                    "value": callback_id
                }
            ]
        }
    ]

    payload = json.dumps({"channel": SLACK_CHANNEL, "blocks": blocks}).encode('utf-8')

    req = urllib.request.Request(
        "https://slack.com/api/chat.postMessage",
        data=payload,
        headers={
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {SLACK_BOT_TOKEN}'
        }
    )
    urllib.request.urlopen(req)


def send_completion_notification(deleted_count, approved_by):
    """削除完了通知を送信"""
    text = f"✅ *{deleted_count}個*のEBSボリュームを削除しました\n承認者: {approved_by}"

    payload = json.dumps({"channel": SLACK_CHANNEL, "text": text}).encode('utf-8')

    req = urllib.request.Request(
        "https://slack.com/api/chat.postMessage",
        data=payload,
        headers={
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {SLACK_BOT_TOKEN}'
        }
    )
    urllib.request.urlopen(req)

メインワークフロー(main.py)

Durable Functionsのワークフロー本体です。ebs.pyslack.pyをimportして4ステップのフローを構成します。

import json
from datetime import datetime
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import WaitForCallbackConfig, Duration

from ebs import scan_unused_volumes, delete_volumes
from slack import send_notification, send_completion_notification, WAIT_DAYS


@durable_execution
def lambda_handler(event, context: DurableContext):
    """未使用EBSボリュームの自動クリーンアップワークフロー"""

    # Step 1: 未使用EBSボリュームをスキャン
    unused_volumes = context.step(scan_unused_volumes, name='scan-ebs')

    if not unused_volumes:
        return {'status': 'no_action', 'message': '未使用のEBSボリュームは見つかりませんでした'}

    # Step 2: Slackに通知を送信し、承認を待機
    def request_approval(callback_id, callback_context):
        send_notification(unused_volumes, callback_id)

    approval = context.wait_for_callback(
        request_approval,
        name='wait-for-approval',
        config=WaitForCallbackConfig(timeout=Duration.from_days(WAIT_DAYS))  # 7日間
    )

    # Step 3: 承認結果に応じて処理
    if isinstance(approval, str):
        approval = json.loads(approval)

    if approval and approval.get('approved'):
        approved_by = approval.get('approved_by', 'unknown')

        # Step 3a: EBS削除
        deleted_volumes = context.step(
            lambda _: delete_volumes(unused_volumes),
            name='delete-volumes'
        )

        # Step 3b: 完了通知
        context.step(
            lambda _: send_completion_notification(len(deleted_volumes), approved_by),
            name='notify-completion'
        )

        return {
            'status': 'deleted',
            'deleted_count': len(deleted_volumes),
            'volumes': deleted_volumes,
            'approved_by': approved_by
        }

    reason = approval.get('reason', 'timeout') if approval else 'timeout'
    return {
        'status': 'skipped',
        'reason': reason,
        'volumes': [v['id'] for v in unused_volumes]
    }

Slackコールバックハンドラー(slack_handler.py)

Slackのボタンがクリックされると、このハンドラーが呼ばれます。ボタンのvalueにはDurable FunctionsのコールバックIDが格納されており、boto3のsend_durable_execution_callback_success APIでワークフローを再開します。

import json
import os
import urllib.request
import urllib.parse
import boto3

lambda_client = boto3.client('lambda')
SLACK_BOT_TOKEN = os.environ.get('SLACK_BOT_TOKEN')
SLACK_CHANNEL = os.environ.get('SLACK_CHANNEL')


def lambda_handler(event, context):
    """Slackインタラクティブボタンのコールバック処理"""

    try:
        body = event.get('body', '')
        if event.get('isBase64Encoded'):
            import base64
            body = base64.b64decode(body).decode('utf-8')

        parsed_body = urllib.parse.parse_qs(body)
        payload = json.loads(parsed_body.get('payload', ['{}'])[0])

        action = payload.get('actions', [{}])[0]
        action_id = action.get('action_id')
        callback_id = action.get('value')
        user_name = payload.get('user', {}).get('name', 'unknown')

        # コールバックデータを作成
        if action_id == 'approve_delete':
            response_data = {'approved': True, 'approved_by': user_name}
            message = f"✅ {user_name}さんが削除を承認しました。処理を開始します..."
        elif action_id == 'reject_delete':
            response_data = {'approved': False, 'reason': 'rejected_by_user', 'rejected_by': user_name}
            message = f"❌ {user_name}さんが削除をキャンセルしました。"
        else:
            return {'statusCode': 400, 'body': json.dumps({'error': f'Unknown action: {action_id}'})}

        # Lambda Durable FunctionsのコールバックAPIで結果を送信
        # 失敗時はexceptionが発生し、Slack通知はスキップされる
        lambda_client.send_durable_execution_callback_success(
            CallbackId=callback_id,
            Result=json.dumps(response_data).encode('utf-8')
        )

        # コールバック成功時のみSlackに確認メッセージを送信
        send_slack_message(message)

        return {'statusCode': 200, 'body': json.dumps({'status': 'ok'})}

    except Exception as e:
        print(f"Error: {e}")
        # コールバック失敗時はSlackに何も送らない(重複防止)
        return {'statusCode': 500, 'body': json.dumps({'error': 'Callback failed'})}


def send_slack_message(text):
    """Slackに新しいメッセージを送信"""
    payload = json.dumps({"channel": SLACK_CHANNEL, "text": text}).encode('utf-8')

    req = urllib.request.Request(
        "https://slack.com/api/chat.postMessage",
        data=payload,
        headers={
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {SLACK_BOT_TOKEN}'
        }
    )
    urllib.request.urlopen(req)

SAMテンプレート(template.yaml)

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: EBS Cleanup Workflow with Lambda Durable Functions

Parameters:
  SlackBotToken:
    Type: String
    Description: Slack Bot User OAuth Token (xoxb-...)
    NoEcho: true

  SlackChannel:
    Type: String
    Description: Slack通知先チャンネルID

  ScheduleExpression:
    Type: String
    Default: "cron(0 0 ? * MON *)"
    Description: EventBridgeスケジュール式(デフォルト: 毎週月曜 UTC 0:00)

  WaitDays:
    Type: Number
    Default: 7
    Description: 承認待機日数(最大365日)
    MinValue: 1
    MaxValue: 365

Globals:
  Function:
    Timeout: 900
    Runtime: python3.13
    MemorySize: 256

Resources:
  # メインワークフロー関数
  EBSCleanupWorkflow:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./
      Handler: main.lambda_handler
      Description: 未使用EBS自動クリーンアップワークフロー
      AutoPublishAlias: live
      DurableConfig:
        ExecutionTimeout: 604800  # 7日間
        RetentionPeriodInDays: 30
      Environment:
        Variables:
          SLACK_BOT_TOKEN: !Ref SlackBotToken
          SLACK_CHANNEL: !Ref SlackChannel
          WAIT_DAYS: !Ref WaitDays
      Policies:
        - Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Action:
                - ec2:DescribeVolumes
                - ec2:DeleteVolume
              Resource: '*'
      Events:
        WeeklySchedule:
          Type: Schedule
          Properties:
            Schedule: !Ref ScheduleExpression
            Description: Weekly EBS cleanup check
            Enabled: true

  # Slackコールバックハンドラー
  SlackCallbackHandler:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./
      Handler: slack_handler.lambda_handler
      Description: Slackボタンのコールバック処理
      Timeout: 30
      MemorySize: 128
      Environment:
        Variables:
          SLACK_BOT_TOKEN: !Ref SlackBotToken
          SLACK_CHANNEL: !Ref SlackChannel
      Policies:
        - Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Action:
                - lambda:SendDurableExecutionCallbackSuccess
                - lambda:SendDurableExecutionCallbackFailure
              Resource: "*"
      FunctionUrlConfig:
        AuthType: NONE
        Cors:
          AllowOrigins:
            - '*'
          AllowMethods:
            - POST
          AllowHeaders:
            - Content-Type

Outputs:
  EBSCleanupWorkflowArn:
    Description: EBS Cleanup Workflow Function ARN
    Value: !GetAtt EBSCleanupWorkflow.Arn

  SlackCallbackUrl:
    Description: Slack Callback Handler URL
    Value: !GetAtt SlackCallbackHandlerUrl.FunctionUrl

前提条件

  • AWS SAM CLI 1.150.1以上(DurableConfig対応)
  • Python 3.13以上
# SAM CLIバージョン確認
sam --version

# macOSでのインストール/更新
brew install aws-sam-cli

Slack Appの準備

ボタンによる承認操作を実現するため、カスタムSlack Appの作成が必要です。

⚠️ Incoming Webhooksではなく、Bot Token + chat.postMessage APIを使用します。Incoming Webhooksで送信したメッセージはアプリに関連付けられないため、インタラクティブなボタンが動作しません。

1. Slack Appを作成

  • Slack API にアクセス
  • Create New AppFrom scratch を選択
  • App名(例: EBS Cleanup Bot)とワークスペースを指定

2. OAuth & Permissions を設定

  • 左メニューの OAuth & Permissions をクリック
  • Bot Token Scopeschat:write を追加

3. アプリをワークスペースにインストール

  • 左メニューの Install App をクリック
  • Install to Workspace をクリックして権限を承認
  • 表示される Bot User OAuth Tokenxoxb-...)をメモ

4. 通知先チャンネルにAppを追加

  • Slackで通知先チャンネルを開く
  • チャンネル名をクリック → インテグレーションアプリを追加 で作成したAppを追加
  • チャンネル名をクリック → 最下部に表示される チャンネルIDC01XXXXXXXX)をメモ

デプロイ手順

# 1. SAMビルド
sam build

# 2. デプロイ(初回)
sam deploy --guided \
  --parameter-overrides \
    SlackBotToken=xoxb-your-bot-token \
    SlackChannel=C01XXXXXXXX

デプロイ完了後、Outputsに表示される SlackCallbackUrl を使って Interactivity を設定します。

5. Interactivity を有効化(デプロイ後)

  • Slack Appの設定画面に戻り、左メニューの Interactivity & Shortcuts をクリック
  • InteractivityOn に切り替え
  • Request URL にデプロイ時に出力された SlackCallbackUrl を設定
  • Save Changes をクリック

動作確認

EBS Cleanup Slack Notification

従来手法との比較

観点 Step Functions Durable Functions
コードの書き方 JSON(ASL)で状態遷移を定義 通常のPythonコード
デバッグ CloudWatch + Step Functions Console 通常のLambdaログ
学習コスト ASL構文の習得が必要 既存のPython知識で対応可能
状態管理 外部で明示的に管理 SDK内部で自動管理
適用シーン 複雑な分岐・並列処理 シンプルな直列ワークフロー
コスト 状態遷移ごとに課金 Lambda実行時間のみ

どちらを選ぶべきか?

Durable Functionsが向いているケース:

  • 承認待ちなどシンプルなワークフロー
  • 既存のLambda関数を拡張したい
  • チームにStep Functionsの知見がない

Step Functionsが向いているケース:

  • 複雑な分岐・並列処理が必要
  • 視覚的なワークフロー設計が必要
  • 複数のAWSサービスを統合したい

まとめ

Lambda Durable Functionsにより、人間の承認を含む長時間ワークフローがシンプルなPythonコードで実現できるようになりました。

主なメリット:

  • 最大1年間の実行に対応
  • 待機中は課金なし
  • チェックポイントによる確実な再開
  • Step Functionsより低い学習コスト

今回紹介したEBSクリーンアップの例のように、運用自動化のハードルが大幅に下がります。ぜひ試してみてください。

参考リンク

Discussion