Lambda Durable Functionsを使って自動運用をよりスマートにしましょう
はじめに
2025年12月のAWS re:Inventで、待望の新機能「Lambda Durable Functions」が発表されました。これにより、従来のLambdaの15分実行制限を超えて、最大1年間のワークフローを単一のLambda関数で実行できるようになります。
従来、人間の承認待ちや長時間の処理が必要なワークフローでは、Step Functionsや複数のLambdaを組み合わせる必要がありました。Durable Functionsを使えば、シンプルなPythonコードだけでこれらを実現できます。
本記事では、実践的なユースケースとして「未使用EBSボリュームの自動クリーンアップ」を例に、Durable Functionsの魅力を解説します。
Lambda Durable Functionsとは
主な特徴
| 機能 | 説明 |
|---|---|
| チェックポイント | 各ステップの結果を自動保存。障害時も途中から再開 |
| 最大1年間の実行 | 人間の承認待ちなど、長期間のワークフローに対応 |
| 待機中は課金なし |
context.wait()中はコンピュート課金が発生しない |
| シンプルなコード | Step Functionsのような複雑なJSON定義は不要 |
対応言語
- Python: 3.13 / 3.14
- JavaScript/TypeScript: Node.js 22 / 24
仕組み(チェックポイント/リプレイ)
Durable Functionsはチェックポイント/リプレイ方式で動作します:
- 各
step()実行後、結果がチェックポイントとして保存される - 関数が中断・再開されると、コードは最初から実行される
- しかし、完了済みのステップはスキップされ、保存済みの結果が返される
- これにより、障害時も確実に途中から再開できる
┌─────────────────────────────────────────────────────────────────┐
│ Durable Execution │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 初回実行 再開時(リプレイ) │
│ ───────── ────────────────── │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Step 1 │ ──実行──→ │ Step 1 │ ──スキップ │
│ └──────┬──────┘ ↓保存 └──────┬──────┘ (結果復元) │
│ ↓ ↓ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ wait() │ ──一時停止→ │ wait() │ ──スキップ │
│ └──────┬──────┘ (課金なし) └──────┬──────┘ │
│ ↓ ↓ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Step 2 │ │ Step 2 │ ──ここから実行│
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
主要なAPI
context.step() - チェックポイント付き実行
ビジネスロジックを実行し、結果を自動的にチェックポイントとして保存します。
# 完了後、結果が保存される。再実行時はスキップされる
result = context.step(
lambda _: call_external_api(),
name='call-api'
)
context.wait() - コスト0の待機
指定時間待機します。待機中はコンピュート課金が発生しません。
# 1時間待機
context.wait(3600)
context.wait_for_callback() - 外部イベント待機
外部システム(Slack承認、Webhook等)からのコールバックを待機します。
approval = context.wait_for_callback(
lambda callback_id, ctx: send_slack_notification(callback_id),
name='wait-approval',
config=WaitForCallbackConfig(timeout=Duration.from_days(7)) # 7日間待機
)
# 結果はJSON文字列として返されるため、json.loads()でパースが必要
context.parallel() / context.map() - 並列実行
複数の処理を並列実行します。
results = context.parallel([
lambda ctx: ctx.step(task_a, name='task-a'),
lambda ctx: ctx.step(task_b, name='task-b'),
], name='parallel-tasks')
実践例: 未使用EBS自動クリーンアップ
ユースケース
AWSを長期間運用していると、EC2インスタンスを削除してもEBSボリュームが残り続けることがあります。これらの「孤児EBS」は無駄なコストの原因になります。
従来の課題:
- 定期的に手動でチェックする必要がある
- 誤って重要なボリュームを削除するリスク
- 承認フローを組み込むにはStep Functionsが必要
Durable Functionsでの解決:
- 定期スキャン → Slack通知 → 承認待機 → 自動削除を1つのLambda関数で実現
アーキテクチャ
┌──────────────────────────────────────────────────────────────────────┐
│ AWS Account │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────────────────────────────────┐ │
│ │ EventBridge │────▶│ Lambda (Durable Function) │ │
│ │ 毎週月曜9:00 │ │ │ │
│ └─────────────┘ │ ┌────────────────────────────────────┐ │ │
│ │ │ Step 1: 未使用EBSスキャン │ │ │
│ │ └─────────────┬──────────────────────┘ │ │
│ ┌─────────────┐ │ ↓ │ │
│ │ Slack │◀────│ ┌────────────────────────────────────┐ │ │
│ │ │ │ │ Step 2: Slack通知 + 承認待機 │ │ │
│ └──────┬──────┘ │ └─────────────┬──────────────────────┘ │ │
│ │ │ ↓ │ │
│ │ │ ┌────────────────────────────────────┐ │ │
│ ┌──────▼──────┐ │ │ Step 3: 承認時 → EBS削除 │ │ │
│ │ Callback │────▶│ └────────────────────────────────────┘ │ │
│ │ Handler │ │ │ │
│ └─────────────┘ └──────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
ファイル構成
コードは責務ごとに4つのファイルに分割しています。2つのLambda関数は同じCodeUriを共有しつつ、異なるHandlerを指定します。
aws-lambda-durable-functions/
├── main.py # ワークフロー本体(lambda_handler)
├── slack_handler.py # Slackコールバック(lambda_handler)
├── ebs.py # EBS操作(scan, delete)
├── slack.py # Slack通知(send_notification, send_completion_notification)
├── template.yaml
├── requirements.txt
└── samconfig.toml
実装コード
EBS操作モジュール(ebs.py)
EBSボリュームのスキャンと削除を担当します。
import boto3
ec2 = boto3.client('ec2')
def scan_unused_volumes(_=None):
"""アタッチされていないEBSボリュームを検索"""
response = ec2.describe_volumes(
Filters=[{'Name': 'status', 'Values': ['available']}]
)
unused_volumes = []
for volume in response['Volumes']:
name = 'N/A'
for tag in volume.get('Tags', []):
if tag['Key'] == 'Name':
name = tag['Value']
break
unused_volumes.append({
'id': volume['VolumeId'],
'size': volume['Size'],
'name': name,
'created': volume['CreateTime'].isoformat(),
'az': volume['AvailabilityZone'],
'volume_type': volume['VolumeType']
})
return unused_volumes
def delete_volumes(volumes):
"""EBSボリュームを削除"""
deleted = []
for volume in volumes:
try:
ec2.delete_volume(VolumeId=volume['id'])
deleted.append(volume['id'])
print(f"Deleted volume: {volume['id']}")
except Exception as e:
print(f"Failed to delete {volume['id']}: {e}")
return deleted
Slack通知モジュール(slack.py)
Slack Block Kit通知の送信と完了通知を担当します。
検証のためSlack Bot Tokenは環境変数から取得していますが、ちゃんとした運用ではSecrets Manager等を利用してください。
import os
import json
import urllib.request
from datetime import datetime, timedelta
SLACK_BOT_TOKEN = os.environ.get('SLACK_BOT_TOKEN')
SLACK_CHANNEL = os.environ.get('SLACK_CHANNEL')
WAIT_DAYS = int(os.environ.get('WAIT_DAYS', 7))
def send_notification(volumes, callback_id):
"""Slack Block Kitメッセージを送信"""
volume_list = "\n".join([
f"• `{v['id']}` ({v['size']}GB, {v['volume_type']}) - {v['name']}"
for v in volumes
])
total_size = sum(v['size'] for v in volumes)
estimated_monthly_cost = total_size * 0.08
blocks = [
{
"type": "header",
"text": {"type": "plain_text", "text": "🔍 未使用EBSボリュームが見つかりました"}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
f"*{len(volumes)}個*の未使用EBSボリュームを検出しました\n"
f"合計サイズ: *{total_size}GB*\n"
f"推定月額コスト: *${estimated_monthly_cost:.2f}*"
)
}
},
{"type": "divider"},
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*対象ボリューム:*\n{volume_list}"}
},
{
"type": "context",
"elements": [{
"type": "mrkdwn",
"text": f"⏰ 承認期限: {(datetime.now() + timedelta(days=WAIT_DAYS)).strftime('%Y-%m-%d %H:%M')}"
}]
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "✅ 削除を承認"},
"style": "primary",
"action_id": "approve_delete",
"value": callback_id
},
{
"type": "button",
"text": {"type": "plain_text", "text": "❌ キャンセル"},
"style": "danger",
"action_id": "reject_delete",
"value": callback_id
}
]
}
]
payload = json.dumps({"channel": SLACK_CHANNEL, "blocks": blocks}).encode('utf-8')
req = urllib.request.Request(
"https://slack.com/api/chat.postMessage",
data=payload,
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {SLACK_BOT_TOKEN}'
}
)
urllib.request.urlopen(req)
def send_completion_notification(deleted_count, approved_by):
"""削除完了通知を送信"""
text = f"✅ *{deleted_count}個*のEBSボリュームを削除しました\n承認者: {approved_by}"
payload = json.dumps({"channel": SLACK_CHANNEL, "text": text}).encode('utf-8')
req = urllib.request.Request(
"https://slack.com/api/chat.postMessage",
data=payload,
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {SLACK_BOT_TOKEN}'
}
)
urllib.request.urlopen(req)
メインワークフロー(main.py)
Durable Functionsのワークフロー本体です。ebs.pyとslack.pyをimportして4ステップのフローを構成します。
import json
from datetime import datetime
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import WaitForCallbackConfig, Duration
from ebs import scan_unused_volumes, delete_volumes
from slack import send_notification, send_completion_notification, WAIT_DAYS
@durable_execution
def lambda_handler(event, context: DurableContext):
"""未使用EBSボリュームの自動クリーンアップワークフロー"""
# Step 1: 未使用EBSボリュームをスキャン
unused_volumes = context.step(scan_unused_volumes, name='scan-ebs')
if not unused_volumes:
return {'status': 'no_action', 'message': '未使用のEBSボリュームは見つかりませんでした'}
# Step 2: Slackに通知を送信し、承認を待機
def request_approval(callback_id, callback_context):
send_notification(unused_volumes, callback_id)
approval = context.wait_for_callback(
request_approval,
name='wait-for-approval',
config=WaitForCallbackConfig(timeout=Duration.from_days(WAIT_DAYS)) # 7日間
)
# Step 3: 承認結果に応じて処理
if isinstance(approval, str):
approval = json.loads(approval)
if approval and approval.get('approved'):
approved_by = approval.get('approved_by', 'unknown')
# Step 3a: EBS削除
deleted_volumes = context.step(
lambda _: delete_volumes(unused_volumes),
name='delete-volumes'
)
# Step 3b: 完了通知
context.step(
lambda _: send_completion_notification(len(deleted_volumes), approved_by),
name='notify-completion'
)
return {
'status': 'deleted',
'deleted_count': len(deleted_volumes),
'volumes': deleted_volumes,
'approved_by': approved_by
}
reason = approval.get('reason', 'timeout') if approval else 'timeout'
return {
'status': 'skipped',
'reason': reason,
'volumes': [v['id'] for v in unused_volumes]
}
Slackコールバックハンドラー(slack_handler.py)
Slackのボタンがクリックされると、このハンドラーが呼ばれます。ボタンのvalueにはDurable FunctionsのコールバックIDが格納されており、boto3のsend_durable_execution_callback_success APIでワークフローを再開します。
import json
import os
import urllib.request
import urllib.parse
import boto3
lambda_client = boto3.client('lambda')
SLACK_BOT_TOKEN = os.environ.get('SLACK_BOT_TOKEN')
SLACK_CHANNEL = os.environ.get('SLACK_CHANNEL')
def lambda_handler(event, context):
"""Slackインタラクティブボタンのコールバック処理"""
try:
body = event.get('body', '')
if event.get('isBase64Encoded'):
import base64
body = base64.b64decode(body).decode('utf-8')
parsed_body = urllib.parse.parse_qs(body)
payload = json.loads(parsed_body.get('payload', ['{}'])[0])
action = payload.get('actions', [{}])[0]
action_id = action.get('action_id')
callback_id = action.get('value')
user_name = payload.get('user', {}).get('name', 'unknown')
# コールバックデータを作成
if action_id == 'approve_delete':
response_data = {'approved': True, 'approved_by': user_name}
message = f"✅ {user_name}さんが削除を承認しました。処理を開始します..."
elif action_id == 'reject_delete':
response_data = {'approved': False, 'reason': 'rejected_by_user', 'rejected_by': user_name}
message = f"❌ {user_name}さんが削除をキャンセルしました。"
else:
return {'statusCode': 400, 'body': json.dumps({'error': f'Unknown action: {action_id}'})}
# Lambda Durable FunctionsのコールバックAPIで結果を送信
# 失敗時はexceptionが発生し、Slack通知はスキップされる
lambda_client.send_durable_execution_callback_success(
CallbackId=callback_id,
Result=json.dumps(response_data).encode('utf-8')
)
# コールバック成功時のみSlackに確認メッセージを送信
send_slack_message(message)
return {'statusCode': 200, 'body': json.dumps({'status': 'ok'})}
except Exception as e:
print(f"Error: {e}")
# コールバック失敗時はSlackに何も送らない(重複防止)
return {'statusCode': 500, 'body': json.dumps({'error': 'Callback failed'})}
def send_slack_message(text):
"""Slackに新しいメッセージを送信"""
payload = json.dumps({"channel": SLACK_CHANNEL, "text": text}).encode('utf-8')
req = urllib.request.Request(
"https://slack.com/api/chat.postMessage",
data=payload,
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {SLACK_BOT_TOKEN}'
}
)
urllib.request.urlopen(req)
SAMテンプレート(template.yaml)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: EBS Cleanup Workflow with Lambda Durable Functions
Parameters:
SlackBotToken:
Type: String
Description: Slack Bot User OAuth Token (xoxb-...)
NoEcho: true
SlackChannel:
Type: String
Description: Slack通知先チャンネルID
ScheduleExpression:
Type: String
Default: "cron(0 0 ? * MON *)"
Description: EventBridgeスケジュール式(デフォルト: 毎週月曜 UTC 0:00)
WaitDays:
Type: Number
Default: 7
Description: 承認待機日数(最大365日)
MinValue: 1
MaxValue: 365
Globals:
Function:
Timeout: 900
Runtime: python3.13
MemorySize: 256
Resources:
# メインワークフロー関数
EBSCleanupWorkflow:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./
Handler: main.lambda_handler
Description: 未使用EBS自動クリーンアップワークフロー
AutoPublishAlias: live
DurableConfig:
ExecutionTimeout: 604800 # 7日間
RetentionPeriodInDays: 30
Environment:
Variables:
SLACK_BOT_TOKEN: !Ref SlackBotToken
SLACK_CHANNEL: !Ref SlackChannel
WAIT_DAYS: !Ref WaitDays
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- ec2:DescribeVolumes
- ec2:DeleteVolume
Resource: '*'
Events:
WeeklySchedule:
Type: Schedule
Properties:
Schedule: !Ref ScheduleExpression
Description: Weekly EBS cleanup check
Enabled: true
# Slackコールバックハンドラー
SlackCallbackHandler:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./
Handler: slack_handler.lambda_handler
Description: Slackボタンのコールバック処理
Timeout: 30
MemorySize: 128
Environment:
Variables:
SLACK_BOT_TOKEN: !Ref SlackBotToken
SLACK_CHANNEL: !Ref SlackChannel
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- lambda:SendDurableExecutionCallbackSuccess
- lambda:SendDurableExecutionCallbackFailure
Resource: "*"
FunctionUrlConfig:
AuthType: NONE
Cors:
AllowOrigins:
- '*'
AllowMethods:
- POST
AllowHeaders:
- Content-Type
Outputs:
EBSCleanupWorkflowArn:
Description: EBS Cleanup Workflow Function ARN
Value: !GetAtt EBSCleanupWorkflow.Arn
SlackCallbackUrl:
Description: Slack Callback Handler URL
Value: !GetAtt SlackCallbackHandlerUrl.FunctionUrl
前提条件
- AWS SAM CLI 1.150.1以上(DurableConfig対応)
- Python 3.13以上
# SAM CLIバージョン確認
sam --version
# macOSでのインストール/更新
brew install aws-sam-cli
Slack Appの準備
ボタンによる承認操作を実現するため、カスタムSlack Appの作成が必要です。
⚠️ Incoming Webhooksではなく、Bot Token +
chat.postMessageAPIを使用します。Incoming Webhooksで送信したメッセージはアプリに関連付けられないため、インタラクティブなボタンが動作しません。
1. Slack Appを作成
- Slack API にアクセス
- Create New App → From scratch を選択
- App名(例:
EBS Cleanup Bot)とワークスペースを指定
2. OAuth & Permissions を設定
- 左メニューの OAuth & Permissions をクリック
-
Bot Token Scopes に
chat:writeを追加
3. アプリをワークスペースにインストール
- 左メニューの Install App をクリック
- Install to Workspace をクリックして権限を承認
- 表示される Bot User OAuth Token(
xoxb-...)をメモ
4. 通知先チャンネルにAppを追加
- Slackで通知先チャンネルを開く
- チャンネル名をクリック → インテグレーション → アプリを追加 で作成したAppを追加
- チャンネル名をクリック → 最下部に表示される チャンネルID(
C01XXXXXXXX)をメモ
デプロイ手順
# 1. SAMビルド
sam build
# 2. デプロイ(初回)
sam deploy --guided \
--parameter-overrides \
SlackBotToken=xoxb-your-bot-token \
SlackChannel=C01XXXXXXXX
デプロイ完了後、Outputsに表示される SlackCallbackUrl を使って Interactivity を設定します。
5. Interactivity を有効化(デプロイ後)
- Slack Appの設定画面に戻り、左メニューの Interactivity & Shortcuts をクリック
- Interactivity を On に切り替え
-
Request URL にデプロイ時に出力された
SlackCallbackUrlを設定 - Save Changes をクリック
動作確認

従来手法との比較
| 観点 | Step Functions | Durable Functions |
|---|---|---|
| コードの書き方 | JSON(ASL)で状態遷移を定義 | 通常のPythonコード |
| デバッグ | CloudWatch + Step Functions Console | 通常のLambdaログ |
| 学習コスト | ASL構文の習得が必要 | 既存のPython知識で対応可能 |
| 状態管理 | 外部で明示的に管理 | SDK内部で自動管理 |
| 適用シーン | 複雑な分岐・並列処理 | シンプルな直列ワークフロー |
| コスト | 状態遷移ごとに課金 | Lambda実行時間のみ |
どちらを選ぶべきか?
Durable Functionsが向いているケース:
- 承認待ちなどシンプルなワークフロー
- 既存のLambda関数を拡張したい
- チームにStep Functionsの知見がない
Step Functionsが向いているケース:
- 複雑な分岐・並列処理が必要
- 視覚的なワークフロー設計が必要
- 複数のAWSサービスを統合したい
まとめ
Lambda Durable Functionsにより、人間の承認を含む長時間ワークフローがシンプルなPythonコードで実現できるようになりました。
主なメリット:
- 最大1年間の実行に対応
- 待機中は課金なし
- チェックポイントによる確実な再開
- Step Functionsより低い学習コスト
今回紹介したEBSクリーンアップの例のように、運用自動化のハードルが大幅に下がります。ぜひ試してみてください。
Discussion