AWS Step Functions × DynamoDBでSlack承認フローを作った — waitForTaskTokenパターンの実装
AWS Step Functions × DynamoDBでSlack承認フローを作った — waitForTaskTokenパターンの実装
この記事について
X自動投稿エージェントで一番面白い部分は、Slackのボタンをクリックするだけで「承認・再生成・スキップ」を操作できるHuman-in-the-Loop(HITL)の仕組みだ。
Step FunctionsのwaitForTaskTokenという機能を使うと、24時間ワークフローを「止めて待つ」ことができる。承認されたらワークフローが再開し、そのまま自動投稿まで流れる。このパターンの実装を具体的に書く。
こんな人に向けて書いている:
- Step FunctionsでHITLを実装したい
- LambdaとStep FunctionsのwaitForTaskTokenパターンを知りたい
- DynamoDBを中間ストレージとしてTaskTokenを管理する方法を知りたい
読み終えたら得られること:
TaskToken発行 → DynamoDB保存 → Slack通知 → コールバック → Step Functions再開という一連の実装が理解できる。
HITLが必要な理由
LLMが生成した投稿をそのまま自動投稿するのは怖い。内容の事実確認ができないし、トーンがずれていることもある。「承認してから投稿する」フローを挟むことで、エージェントの自律性を保ちながら品質管理ができる。
TaskTokenパターンの仕組み
Step Functionsにはタスクが完了するまでワークフローを一時停止するwaitForTaskTokenという機能がある。
通常のLambda呼び出しはLambdaが返答したらすぐ次のステートに進む。waitForTaskTokenの場合は、Lambdaの処理が終わっても「外部からsend_task_successが呼ばれるまで」待ち続ける。タイムアウトは最大1年まで設定できる。
ステートマシンの定義:
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "content-agent-notifier",
"Payload": {
"taskToken.$": "$$.Task.Token",
"drafts.$": "$.drafts",
"notionUrl.$": "$.notionUrl",
"theme.$": "$.theme",
"date.$": "$.date"
}
},
"TimeoutSeconds": 86400,
"Catch": [
{
"ErrorEquals": ["States.TaskTimedOut"],
"Next": "ApprovalTimedOut",
"ResultPath": "$.error"
}
],
"Next": "CheckApproval"
}
$$.Task.TokenでStep FunctionsがTaskTokenを自動生成してLambdaに渡す。TimeoutSeconds: 86400は24時間。24時間以内に承認されなければApprovalTimedOutステートに遷移してエラー終了する。
Notifier Lambda — TokenをDynamoDBに保存してSlack通知
Notifier Lambdaがやることは2つだ。TaskTokenをDynamoDBに保存して、SlackにURLボタン付きメッセージを送信する。
DynamoDBへのToken保存
def store_token(table_name: str, approval_id: str, task_token: str, drafts: list, date_str: str):
"""taskToken を DynamoDB に保存(24時間 TTL)"""
dynamodb = boto3.resource("dynamodb", region_name=REGION)
table = dynamodb.Table(table_name)
expires = int((datetime.now(JST) + timedelta(hours=24)).timestamp())
table.put_item(Item={
"approval_id": approval_id,
"task_token": task_token,
"drafts": json.dumps(drafts, ensure_ascii=False),
"date": date_str,
"expires_at": expires, # DynamoDB TTL
})
approval_idはUUIDで生成してSlackのURLに埋め込む。Tokenを直接URLに入れないのは、長いし安全面でも問題がある。expires_atはStep FunctionsのTimeoutSecondsと同じ24時間に設定している。TTLを設定することでDynamoDBが自動的に古いレコードを削除してくれる。
Slack Block Kitでボタンを作る
SlackのBlock Kitを使うと、クリックでURLに飛ぶボタンが作れる。
def approval_url(action: str, draft_index: int = 0, feedback: str = "") -> str:
params = {"id": approval_id, "action": action, "draft": draft_index}
if feedback:
params["feedback"] = feedback
return f"{approval_url_base}?{urllib.parse.urlencode(params)}"
# 承認ボタン(3案分)
{
"type": "button",
"text": {"type": "plain_text", "text": "✅ 案1 で投稿"},
"style": "primary",
"url": approval_url("approved", 0),
},
# 再生成ボタン(プリセット)
{
"type": "button",
"text": {"type": "plain_text", "text": "👥 非エンジニア向けに"},
"url": approval_url("revise", 0, "非エンジニア向けに書き直して..."),
},
# スキップボタン
{
"type": "button",
"text": {"type": "plain_text", "text": "❌ 投稿しない"},
"url": approval_url("skip", 0),
},
URLにaction・draft・approval_id(・必要に応じてfeedback)を含める。ボタンクリックでこのURLにブラウザが飛ぶ。
Approval Callback Lambda — TokenをStep Functionsに返す
SlackのボタンをクリックするとブラウザでコールバックURLが開く。API GatewayがこのリクエストをApproval Callback Lambdaに渡す。
def lambda_handler(event, context):
params = event.get("queryStringParameters") or {}
approval_id = params.get("id", "")
action = params.get("action", "")
draft_index = int(params.get("draft", 0))
feedback = params.get("feedback", "")
# DynamoDB から taskToken 取得
table = dynamodb.Table(table_name)
resp = table.get_item(Key={"approval_id": approval_id})
item = resp.get("Item")
if not item:
return html_response(404, "リンク期限切れ", "このリンクはすでに使用済みか、期限切れです。")
task_token = item["task_token"]
drafts = json.loads(item.get("drafts", "[]"))
# Step Functions に結果を返す
sfn = boto3.client("stepfunctions", region_name=REGION)
output = json.dumps({
"action": action,
"selectedDraft": drafts[draft_index]["draft"] if action == "approved" and drafts else "",
"draftIndex": draft_index,
"feedback": feedback,
})
try:
sfn.send_task_success(taskToken=task_token, output=output)
table.delete_item(Key={"approval_id": approval_id}) # 使用済みTokenを削除
except sfn.exceptions.TaskTimedOut:
return html_response(410, "タイムアウト", "ワークフローがタイムアウトしました。")
send_task_successを呼ぶとStep Functionsのワークフローが再開する。outputに含めた情報がそのまま次のステート(CheckApproval)に渡される。
DynamoDBからTokenを取得して使用後に削除することで、同じリンクが2回使われるのを防いでいる(タイムアウト後にURLが再利用されるケースへの対策)。
ユーザーにHTMLページを返す
コールバックLambdaはHTMLをレスポンスとして返す。ブラウザで「承認済み」などのメッセージが表示される。
def html_response(status: int, title: str, message: str, emoji: str = "✅") -> dict:
body = f"""<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<title>{title}</title>
<style>
body {{ display: flex; justify-content: center; align-items: center; min-height: 100vh; }}
.card {{ background: white; border-radius: 16px; padding: 48px; text-align: center; }}
</style>
</head>
<body>
<div class="card">
<div style="font-size:64px">{emoji}</div>
<h1>{title}</h1>
<p>{message}</p>
</div>
</body>
</html>"""
return {"statusCode": status, "headers": {"Content-Type": "text/html; charset=utf-8"}, "body": body}
カスタム再生成フォーム
「もっと詳しく」「端的に」というプリセット以外に、自由に指示を入力できるHTMLフォームも実装した。Slackの「✍️ カスタム指示で再生成」ボタンをクリックすると、フォームページが開く。
def revise_form_response(approval_id: str, base_url: str) -> dict:
"""プリセットラジオボタン + 自由入力エリアのHTMLフォーム"""
presets = [
("非エンジニア向けに", "非エンジニア向けに書き直して..."),
("もっと詳しく", "もっと具体的に詳しく書いて..."),
("端的に", "もっと端的・簡潔に書いて..."),
]
# ... HTML生成
フォームのJavaScriptで、プリセット選択と自由入力を統合してURLパラメータに変換し、コールバックURLにリダイレクトする。
動作確認のポイント
実装で詰まりやすいのは以下の点だ。
Step Functionsの$$.Task.Tokenと$.Task.Tokenの違い:$$はコンテキストオブジェクト(Step Functions内部の情報)へのアクセスで、TaskTokenはここから取得する。$はペイロード(前のステートから渡されたデータ)へのアクセス。混同すると「TokenがLambdaに渡らない」という問題が起きる。
DynamoDBのTTLとsend_task_successのタイミング:TTLが先に切れてTokenが削除された状態でsend_task_successを呼ぶと「Task not found」になる。Step FunctionsのTimeoutSecondsとDynamoDBのTTLを同じ24時間に揃えることで、どちらかが先に切れるという問題を防いでいる。
まとめ
waitForTaskToken + DynamoDB + SlackボタンというHITLパターンは、実装量は多いがほぼすべての承認フローに転用できる。
フローをまとめると:Step FunctionsがTaskTokenを発行 → DynamoDBに保存 → SlackのURLにapproval_idを埋め込む → ユーザーがクリック → コールバックLambdaがTokenを取得してsend_task_successを呼ぶ → Step Functionsが再開。
承認フローが必要なエージェントを作る際のベースになるパターンとして使い回せる。
参考リンク:
Discussion