AWS Step Functions × DynamoDBでSlack承認フローを作った — waitForTaskTokenパターンの実装

に公開

AWS Step Functions × DynamoDBでSlack承認フローを作った — waitForTaskTokenパターンの実装

この記事について

X自動投稿エージェントで一番面白い部分は、Slackのボタンをクリックするだけで「承認・再生成・スキップ」を操作できるHuman-in-the-Loop(HITL)の仕組みだ。

Step FunctionsのwaitForTaskTokenという機能を使うと、24時間ワークフローを「止めて待つ」ことができる。承認されたらワークフローが再開し、そのまま自動投稿まで流れる。このパターンの実装を具体的に書く。

こんな人に向けて書いている:

  • Step FunctionsでHITLを実装したい
  • LambdaとStep FunctionsのwaitForTaskTokenパターンを知りたい
  • DynamoDBを中間ストレージとしてTaskTokenを管理する方法を知りたい

読み終えたら得られること:
TaskToken発行 → DynamoDB保存 → Slack通知 → コールバック → Step Functions再開という一連の実装が理解できる。


HITLが必要な理由

LLMが生成した投稿をそのまま自動投稿するのは怖い。内容の事実確認ができないし、トーンがずれていることもある。「承認してから投稿する」フローを挟むことで、エージェントの自律性を保ちながら品質管理ができる。


TaskTokenパターンの仕組み

Step Functionsにはタスクが完了するまでワークフローを一時停止するwaitForTaskTokenという機能がある。

通常のLambda呼び出しはLambdaが返答したらすぐ次のステートに進む。waitForTaskTokenの場合は、Lambdaの処理が終わっても「外部からsend_task_successが呼ばれるまで」待ち続ける。タイムアウトは最大1年まで設定できる。

ステートマシンの定義:

"WaitForApproval": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
  "Parameters": {
    "FunctionName": "content-agent-notifier",
    "Payload": {
      "taskToken.$": "$$.Task.Token",
      "drafts.$":    "$.drafts",
      "notionUrl.$": "$.notionUrl",
      "theme.$":     "$.theme",
      "date.$":      "$.date"
    }
  },
  "TimeoutSeconds": 86400,
  "Catch": [
    {
      "ErrorEquals": ["States.TaskTimedOut"],
      "Next": "ApprovalTimedOut",
      "ResultPath": "$.error"
    }
  ],
  "Next": "CheckApproval"
}

$$.Task.TokenでStep FunctionsがTaskTokenを自動生成してLambdaに渡す。TimeoutSeconds: 86400は24時間。24時間以内に承認されなければApprovalTimedOutステートに遷移してエラー終了する。


Notifier Lambda — TokenをDynamoDBに保存してSlack通知

Notifier Lambdaがやることは2つだ。TaskTokenをDynamoDBに保存して、SlackにURLボタン付きメッセージを送信する。

DynamoDBへのToken保存

def store_token(table_name: str, approval_id: str, task_token: str, drafts: list, date_str: str):
    """taskToken を DynamoDB に保存(24時間 TTL)"""
    dynamodb = boto3.resource("dynamodb", region_name=REGION)
    table    = dynamodb.Table(table_name)
    expires  = int((datetime.now(JST) + timedelta(hours=24)).timestamp())

    table.put_item(Item={
        "approval_id": approval_id,
        "task_token":  task_token,
        "drafts":      json.dumps(drafts, ensure_ascii=False),
        "date":        date_str,
        "expires_at":  expires,  # DynamoDB TTL
    })

approval_idはUUIDで生成してSlackのURLに埋め込む。Tokenを直接URLに入れないのは、長いし安全面でも問題がある。expires_atはStep FunctionsのTimeoutSecondsと同じ24時間に設定している。TTLを設定することでDynamoDBが自動的に古いレコードを削除してくれる。

Slack Block Kitでボタンを作る

SlackのBlock Kitを使うと、クリックでURLに飛ぶボタンが作れる。

def approval_url(action: str, draft_index: int = 0, feedback: str = "") -> str:
    params = {"id": approval_id, "action": action, "draft": draft_index}
    if feedback:
        params["feedback"] = feedback
    return f"{approval_url_base}?{urllib.parse.urlencode(params)}"

# 承認ボタン(3案分)
{
    "type": "button",
    "text": {"type": "plain_text", "text": "✅ 案1 で投稿"},
    "style": "primary",
    "url": approval_url("approved", 0),
},

# 再生成ボタン(プリセット)
{
    "type": "button",
    "text": {"type": "plain_text", "text": "👥 非エンジニア向けに"},
    "url": approval_url("revise", 0, "非エンジニア向けに書き直して..."),
},

# スキップボタン
{
    "type": "button",
    "text": {"type": "plain_text", "text": "❌ 投稿しない"},
    "url": approval_url("skip", 0),
},

URLにactiondraftapproval_id(・必要に応じてfeedback)を含める。ボタンクリックでこのURLにブラウザが飛ぶ。


Approval Callback Lambda — TokenをStep Functionsに返す

SlackのボタンをクリックするとブラウザでコールバックURLが開く。API GatewayがこのリクエストをApproval Callback Lambdaに渡す。

def lambda_handler(event, context):
    params      = event.get("queryStringParameters") or {}
    approval_id = params.get("id", "")
    action      = params.get("action", "")
    draft_index = int(params.get("draft", 0))
    feedback    = params.get("feedback", "")

    # DynamoDB から taskToken 取得
    table = dynamodb.Table(table_name)
    resp  = table.get_item(Key={"approval_id": approval_id})
    item  = resp.get("Item")
    if not item:
        return html_response(404, "リンク期限切れ", "このリンクはすでに使用済みか、期限切れです。")

    task_token = item["task_token"]
    drafts     = json.loads(item.get("drafts", "[]"))

    # Step Functions に結果を返す
    sfn    = boto3.client("stepfunctions", region_name=REGION)
    output = json.dumps({
        "action":        action,
        "selectedDraft": drafts[draft_index]["draft"] if action == "approved" and drafts else "",
        "draftIndex":    draft_index,
        "feedback":      feedback,
    })

    try:
        sfn.send_task_success(taskToken=task_token, output=output)
        table.delete_item(Key={"approval_id": approval_id})  # 使用済みTokenを削除
    except sfn.exceptions.TaskTimedOut:
        return html_response(410, "タイムアウト", "ワークフローがタイムアウトしました。")

send_task_successを呼ぶとStep Functionsのワークフローが再開する。outputに含めた情報がそのまま次のステート(CheckApproval)に渡される。

DynamoDBからTokenを取得して使用後に削除することで、同じリンクが2回使われるのを防いでいる(タイムアウト後にURLが再利用されるケースへの対策)。

ユーザーにHTMLページを返す

コールバックLambdaはHTMLをレスポンスとして返す。ブラウザで「承認済み」などのメッセージが表示される。

def html_response(status: int, title: str, message: str, emoji: str = "✅") -> dict:
    body = f"""<!DOCTYPE html>
<html lang="ja">
<head>
  <meta charset="UTF-8">
  <title>{title}</title>
  <style>
    body {{ display: flex; justify-content: center; align-items: center; min-height: 100vh; }}
    .card {{ background: white; border-radius: 16px; padding: 48px; text-align: center; }}
  </style>
</head>
<body>
  <div class="card">
    <div style="font-size:64px">{emoji}</div>
    <h1>{title}</h1>
    <p>{message}</p>
  </div>
</body>
</html>"""
    return {"statusCode": status, "headers": {"Content-Type": "text/html; charset=utf-8"}, "body": body}

カスタム再生成フォーム

「もっと詳しく」「端的に」というプリセット以外に、自由に指示を入力できるHTMLフォームも実装した。Slackの「✍️ カスタム指示で再生成」ボタンをクリックすると、フォームページが開く。

def revise_form_response(approval_id: str, base_url: str) -> dict:
    """プリセットラジオボタン + 自由入力エリアのHTMLフォーム"""
    presets = [
        ("非エンジニア向けに", "非エンジニア向けに書き直して..."),
        ("もっと詳しく", "もっと具体的に詳しく書いて..."),
        ("端的に", "もっと端的・簡潔に書いて..."),
    ]
    # ... HTML生成

フォームのJavaScriptで、プリセット選択と自由入力を統合してURLパラメータに変換し、コールバックURLにリダイレクトする。


動作確認のポイント

実装で詰まりやすいのは以下の点だ。

Step Functionsの$$.Task.Token$.Task.Tokenの違い:$$はコンテキストオブジェクト(Step Functions内部の情報)へのアクセスで、TaskTokenはここから取得する。$はペイロード(前のステートから渡されたデータ)へのアクセス。混同すると「TokenがLambdaに渡らない」という問題が起きる。

DynamoDBのTTLとsend_task_successのタイミング:TTLが先に切れてTokenが削除された状態でsend_task_successを呼ぶと「Task not found」になる。Step FunctionsのTimeoutSecondsとDynamoDBのTTLを同じ24時間に揃えることで、どちらかが先に切れるという問題を防いでいる。


まとめ

waitForTaskToken + DynamoDB + SlackボタンというHITLパターンは、実装量は多いがほぼすべての承認フローに転用できる。

フローをまとめると:Step FunctionsがTaskTokenを発行 → DynamoDBに保存 → SlackのURLにapproval_idを埋め込む → ユーザーがクリック → コールバックLambdaがTokenを取得してsend_task_successを呼ぶ → Step Functionsが再開。

承認フローが必要なエージェントを作る際のベースになるパターンとして使い回せる。

参考リンク:

Discussion