workflow型AIエージェントにおけるキルスイッチ設計案
はじめに
satto workspaceの技術責任者の田口(@yuki_taguchi)です。
昨今、AIエージェントの自律化と相互連携が急速に進展しています。
特にAIエージェントは、複雑なタスクを自動化できる一方で、制御不能な状態に陥るリスクも内包しています。
本稿では、satto shortcut時代にワークフロー型のAIエージェントを作っていた時の技術供養として
YAMLベースのworkflow型AIエージェントにおけるキルスイッチの実装と、安全な実行環境の構築について解説します。
workflow型のAIエージェントについて
workflow型のAIエージェントは、従来のAIエージェントとは根本的に異なるアプローチで実装されます。通常のAIエージェントが全体的な判断と実行をAIに委ねるのに対し、workflow型では実行フローを明確に定義し、AIの判断部分を限定的かつ明確に分離することで、完全な再現性と予測可能性を実現します[1]。
従来のAIエージェントとの決定的な違い
1. 完全な再現性の保証
# workflow型では各ステップが明確に定義される
steps:
- id: "capture_context_ocr"
name: "画面キャプチャの要点抽出"
tool: "vision.ocr"
action: "ExtractText"
enabled: true
when:
equals: ["{{ user_context.multimodal_signals.last_screenshot_ref | default('') != '' }}", true]
params:
image_ref: "{{ user_context.multimodal_signals.last_screenshot_ref }}"
top_k_keywords: 10
outputs:
- key: "ocr_keywords" # 出力も明確に定義
各ステップのツール、アクション、パラメータが事前に定義されるため、同じ入力に対して必ず同じ実行パスを通り、同じ結果を生成します。
2. AIの役割を限定した確実性
比較項目 | 従来のAIエージェント | workflow型のAIエージェント |
---|---|---|
実行フローの決定 | AIが全体のフローを判断 | 実行フローは完全に事前定義 |
AIの役割 | 全体的な制御と判断 | 特定ステップでの判断のみ |
実行経路 | 不透明(ブラックボックス) | 完全に可視化・追跡可能 |
結果の予測性 | 予測困難 | 高い予測可能性 |
エラー発生時 | 原因特定が困難 | ステップ単位で原因特定可能 |
テスタビリティ | End-to-Endテストが主 | ステップ単位での単体テスト可能 |
3. workflow間の連携による完全性
steps:
- id: "slack_notify"
name: "Slackへ共有通知"
tool: "slack.chat.post"
action: "PostMessage"
params:
channel: "#sales-team"
text: >
{{ user_context.profile.name }} さん、
「{{ vars.report_title }}」をDriveに保存しました。
{{ report_file_url }}
metadata:
category: "{{ metadata.category }}"
domain: "{{ metadata.domain }}"
apps: "{{ metadata.applications | join(',') }}"
outputs:
- key: "slack_message_ts"
AIのアクションで別のworkflowを呼び出すことで、行動・実行・結果が完全に保証された部品を組み合わせて複雑な処理を実現します。
期待通りに動作しない問題をほぼゼロにする仕組み
1. 入出力スキーマの厳格な定義
io:
inputs:
quarter:
type: "string"
enum: ["Q1","Q2","Q3","Q4"] # 許可される値を限定
default: "Q3"
fiscal_year:
type: "integer"
default: 2025
regions:
type: "array"
items: { type: "string" }
default: ["APAC","EMEA","AMER","JPN"]
outputs:
report_file_url: { type: "string", required: true }
drive_file_id: { type: "string" }
slack_message_ts: { type: "string" }
2. 実行制限とタイムアウト
execution_limits:
max_duration: "300s"
max_retries: 3
memory_limit: "2GB"
3. 明確なエラーハンドリング
on_error:
- match: "auth_*"
action: "prompt_reauth"
message: "接続の認可が失効しました。再認証してください。"
- match: "rate_limit"
action: "retry"
params: { max_attempts: 3, backoff: "exponential" }
- match: "*"
action: "fail_with_log"
redact_sensitive: true
これらの仕組みにより、AIエージェントが期待と異なる動作をする可能性を限りなくゼロに近づけ、本番環境での信頼性を飛躍的に向上させます。
YAMLでworkflowを管理する利点について
YAMLはworkflow定義において、以下の利点を提供します[2]:
1. 宣言的な構成管理
# workflow定義例
kind: Workflow
metadata:
title: "Q3 営業パフォーマンスレポート自動化"
name: "sales_q3_report_workflow"
category: "Reporting"
domain: "Sales"
applications: ["Salesforce CRM", "Microsoft Excel", "Google Drive", "Slack"]
steps:
- id: "crm_query"
name: "SalesforceからQ3売上データ取得"
tool: "salesforce.query"
action: "SOQL"
params:
query: >
SELECT Region__c, SUM(Amount) total
FROM Opportunity
WHERE IsClosed=true
AND CloseDate = THIS_{{ io.quarter }}
- id: "excel_prepare"
name: "Excelテンプレへ書き込み"
tool: "excel.graph"
action: "PopulateTemplate"
params:
template_ref: "{{ vars.template_ref }}"
2. 人間可読性とバージョン管理
- VCS機能をプロダクトに組み込むことで差分管理が容易
- レビュープロセスへの統合がスムーズ
- 設定の履歴追跡が可能
3. スキーマバリデーション
- 型安全性の確保
- 実行前の構成エラー検出
- IDEでの自動補完サポート
workflow型のAIエージェント(yml)が自己増殖・自己進化しやすい理由
YAMLベースのworkflow定義は、従来型のAIエージェントと比較して、構造的に自己増殖・自己進化を実現しやすい優れた特性を持っています。これにより、バックグラウンドでの自動スケーリングと継続的な最適化が可能になります。
従来型との決定的な違い:構造化による自動スケーリングの実現
特性 | 従来型AIエージェント | workflow型(YAML) |
---|---|---|
スケール方式 | 手動でコードを複製・修正 | 自動的な複製・展開 |
LLMによる理解 | 文脈依存で曖昧 | スキーマベースで確実 |
進化の実現 | プログラム改修が必要 | 動的に自己最適化 |
並列化コスト | 高い(アーキテクチャ変更必要) | 低い(宣言的に定義) |
バックグラウンド実行 | 複雑な実装が必要 | ネイティブサポート |
1. 動的なworkflow生成による自動スケーリング
steps:
- id: "monitor_load"
name: "負荷状況の監視"
tool: "metrics.monitor"
params:
threshold: "{{ vars.scaling_threshold | default(80) }}"
outputs:
- key: "current_load_percentage"
- id: "auto_scale_decision"
name: "スケーリング判断"
tool: "ai.analyze"
params:
metrics: "{{ current_load_percentage }}"
pattern: "{{ user_context.activity_history }}"
outputs:
- key: "scaling_factor"
- id: "spawn_workers"
name: "ワーカーエージェントの自動生成"
tool: "CallWorkflow"
action: "invoke_parallel"
params:
instances: "{{ scaling_factor }}"
workflow_ref: "worker_agent_v{{ metadata.version }}"
inherit_context: true # コンテキスト継承で学習結果を共有
このパターンにより、負荷に応じてAIエージェントが自動的にスケールアウトし、バックグラウンドで必要な数だけエージェントを生成できます。
2. 階層的な分散処理による効率的なタスク処理
metadata:
orchestration_level: "{{ parent.level | default('master') }}"
worker_count: "{{ compute.optimal_workers }}"
steps:
- id: "analyze_workload"
name: "ワークロードの分析と分割"
tool: "ai.workload_analyzer"
params:
total_tasks: "{{ input_context.task_queue }}"
performance_target: "{{ user_context.preferences.sla }}"
outputs:
- key: "task_partitions"
- id: "distribute_tasks"
name: "タスクの並列分散実行"
tool: "CallWorkflow"
action: "map_reduce"
params:
partitions: "{{ task_partitions }}"
workflow_ref: "task_processor_{{ metadata.version }}"
parallel_degree: "{{ worker_count }}"
coordination:
mode: "async"
callback: "aggregate_results"
階層的な構造により、マスターエージェントが自動的にワーカーエージェントを生成し、効率的な並列処理を実現します。
3. テンプレート機能とコンテキスト継承による継続的最適化
# ユーザーコンテキストの学習による自動最適化
user_context:
learned_patterns:
- pattern: "{{ previous_execution.learned_pattern }}"
- success_rate: "{{ previous_execution.success_rate }}"
connections:
tools:
- id: "memory.write"
params:
knowledge_base: "{{ shared_memory_ref }}"
steps:
- id: "evolve_strategy"
name: "実行戦略の継続的改善"
tool: "ai.optimize"
params:
current_strategy: "{{ user_context.learned_patterns }}"
performance_metrics: "{{ execution_history.metrics }}"
optimization_goal: "maximize_efficiency"
outputs:
- key: "evolved_strategy"
- id: "update_workflow"
name: "workflow定義の自動最適化"
tool: "yaml.patch"
params:
target: "{{ workflow.self_reference }}"
patches:
- op: "replace"
path: "/steps/0/params"
value: "{{ evolved_strategy }}"
継続的な学習とフィードバックループにより、システムが自動的にパフォーマンスを向上させ、ユーザーの作業効率を最大化します。
4. 構造化データによる自動化の促進
YAMLの構造化された形式は、LLMによる高度な自動化を実現する理想的な基盤となります:
# LLMが効率的に処理できる明確な構造
metadata:
embedding_hint: "自動スケーリングのためのworkflow構造" # 自動化ヒント
io:
inputs:
type: "object"
properties: {} # 明確なスキーマによる自動検証
outputs:
type: "object"
required: ["result"] # 出力の一貫性保証
# 動的スケーリングのための変数制御
vars:
auto_scale_factor: "{{ vars.auto_scale_factor | default(1) * 1.5 }}"
worker_instances: "{{ vars.worker_instances | default(1) + demand_delta }}"
この明確な構造により、AIエージェントが負荷に応じて自動的にリソースを調整し、最適なパフォーマンスを維持できます。
5. 自動スケーリングがもたらすビジネス価値
workflow型のAIエージェントの自己増殖・自己進化機能は、ビジネスの成長に合わせた自動スケーリングを実現します:
-
負荷分散の自動化:
CallWorkflow
による動的なワーカー生成 - パフォーマンスの継続的改善: テンプレート変数による設定の自動調整
- ナレッジの蓄積と活用: コンテキスト継承による組織知の自動構築
- 運用の透明性: YAMLの可読性による監査とガバナンスの容易性
これらの特性により、人的介入を最小限に抑えながら、ビジネス要求に応じた柔軟なスケーリングが可能となります。しかし、この強力な自動化能力を安全に活用するためには、適切な制御メカニズムが不可欠です。
AIエージェントが増殖し動く場合のキルスイッチがなぜ必要なのか
McKinseyのレポートによると、2024年に主要なAI企業がキルスイッチポリシーの実装に合意しました[3]。その必要性は以下の点にあります:
1. カスケード効果の防止
単一のAIエージェントの誤動作が、連鎖的に他のエージェントに影響を及ぼすことを防ぎます。特に自己複製能力を持つエージェント(AutoGPTなど)では、1つの侵害がシステム全体に波及するリスクがあります[4]。
2. リソース枯渇の回避
制御不能なエージェントの増殖により:
- コンピューティングリソースの枯渇
- ネットワーク帯域の飽和
- ストレージの圧迫
3. セキュリティインシデントの防止
- 悪意のあるコードの拡散防止
- データ漏洩の最小化
- 不正なAPI呼び出しの停止
4. コンプライアンスと説明責任
各企業は強力なAIシステムに対して「完全停止」メカニズムの実装を要求し始めています[5]。
AIエージェントを実行するアーキテクチャについて
AIエージェントのトリガーと、AIエージェントの実行環境を分ける
安全なAIエージェント実行環境では、トリガー層と実行層を明確に分離します:
この分離により、以下のメリットが得られます:
- 単一責任原則: トリガー処理と実行ロジックが独立
- 障害の局所化: 実行層の問題がトリガー層に波及しない
- セキュリティ強化: 各層で異なるアクセス制御を適用可能
YML定義の解説
実際のworkflow定義例を解説します:
kind: Workflow
metadata:
title: "Q3 営業パフォーマンスレポート自動化"
name: "sales_q3_report_workflow"
description: "SalesforceからQ3売上を取得→Excelテンプレ作成→Google Driveへ保存→Slack通知までを自動化。"
category: "Reporting" # ドメイン分類
domain: "Sales" # 業務領域
complexity: "Intermediate" # 複雑度レベル
applications: ["Salesforce CRM", "Microsoft Excel", "Google Drive", "Slack"]
services: ["Salesforce API", "Microsoft Graph", "Google Drive API", "Slack API"]
version: "1.0.0"
# 監査・コンプライアンス
governance:
approval_required: true
reviewers: ["automation.review@corp.example"]
retention_policy:
pii: "referenced_only" # 機微情報は参照IDのみで保持
logs: "redact_sensitive" # 実行ログは秘匿マスク
# 実行時の入力コンテキスト
input_context:
instructions: "Q3の営業パフォーマンスレポートを全地域分作成して共有してください。"
gui_state:
active_application: "Salesforce CRM"
current_screen: "Sales Dashboard - Q3"
attachments:
- name: "Q3_Report_Template.xlsx"
type: "spreadsheet"
ref: "file://uploads/12345"
# ユーザーコンテキスト(個人化・分岐に利用)
user_context:
profile:
name: "Alice Jones"
role: "Sales Manager"
department: "Sales"
timezone: "Asia/Tokyo"
preferences:
preferred_report_format: "Excel"
language: "Japanese"
# 入出力スキーマ(厳格な型定義)
io:
inputs:
quarter:
type: "string"
enum: ["Q1","Q2","Q3","Q4"]
default: "Q3"
fiscal_year:
type: "integer"
default: 2025
regions:
type: "array"
items: { type: "string" }
default: ["APAC","EMEA","AMER","JPN"]
outputs:
report_file_url: { type: "string", required: true }
drive_file_id: { type: "string" }
slack_message_ts: { type: "string" }
# ワークフローステップ
steps:
- id: "crm_query"
name: "SalesforceからQ3売上データ取得"
tool: "salesforce.query"
action: "SOQL"
params:
query: >
SELECT Region__c, SUM(Amount) total
FROM Opportunity
WHERE IsClosed=true
AND CloseDate = THIS_{{ io.quarter }}
GROUP BY Region__c
retry:
max_attempts: 2
backoff: "exponential"
outputs:
- key: "sales_data"
- id: "excel_prepare"
name: "Excelテンプレへ書き込み"
tool: "excel.graph"
action: "PopulateTemplate"
params:
template_ref: "{{ input_context.attachments[0].ref }}"
sheet_name: "Summary"
cells:
- range: "B2"
value: "{{ io.fiscal_year }}年 {{ io.quarter }} 営業レポート"
outputs:
- key: "filled_workbook_ref"
- id: "drive_upload"
name: "Google Driveへ保存"
tool: "gdrive.files"
action: "Upload"
params:
file_ref: "{{ filled_workbook_ref }}"
folder: "/Reports/{{ io.fiscal_year }}/{{ io.quarter }}"
outputs:
- key: "report_file_url"
YML定義に基づいて、動的にAIエージェントの実行環境を作成する方法について
エージェント動的生成のアプローチ
AIエージェントの動的生成は、エージェントの呼び出し元(Lambda/ECS)がYAML定義ファイルを読み込み
AWS SDKをベースにした内部抽象化レイヤー(AWS SDKの機能を拡張した独自ラッパー)を活用して
必要最小限の権限とネットワーク制御を持つ専用環境を動的に構築するアーキテクチャです。
重要な設計原則:
- 一時的な環境: 各エージェントは実行毎に専用環境を生成し、実行完了後は自動削除
- 最小権限の原則: YAMLで定義された権限のみを付与
- 完全な隔離: エージェント毎に独立したネットワーク環境とセキュリティグループを構築
-
関数型プログラミング:
neverthrow
とzod
による型安全性とエラーハンドリング - DDD/クリーンアーキテクチャ: ドメイン層とインフラ層の明確な分離
動的生成プロセスの詳細フロー
エージェント環境動的生成の実装コード例(AWS SDK v3 + neverthrow + zod)
// AWS SDK v3のインポート
import { ECSClient, RunTaskCommand, RegisterTaskDefinitionCommand, TaskDefinition } from '@aws-sdk/client-ecs';
import { IAMClient, CreateRoleCommand, CreatePolicyCommand, AttachRolePolicyCommand } from '@aws-sdk/client-iam';
import { EC2Client, CreateSecurityGroupCommand, AuthorizeSecurityGroupEgressCommand, CreateVpcEndpointCommand } from '@aws-sdk/client-ec2';
import { LambdaClient, PutFunctionConcurrencyCommand } from '@aws-sdk/client-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { BedrockAgentRuntimeClient } from '@aws-sdk/client-bedrock-agent-runtime';
// 関数型プログラミングライブラリ
import { ResultAsync, Result, ok, err } from 'neverthrow';
import { z } from 'zod';
import * as yaml from 'js-yaml';
// YAMLスキーマ定義(Zod)
const WorkflowMetadataSchema = z.object({
title: z.string(),
name: z.string().regex(/^[a-z0-9_]+$/),
domain: z.enum(['Sales', 'Finance', 'HR', 'IT', 'Support', 'Marketing']),
complexity: z.enum(['Basic', 'Intermediate', 'Advanced']),
applications: z.array(z.string()),
services: z.array(z.string()),
version: z.string().regex(/^\d+\.\d+\.\d+$/),
governance: z.object({
approval_required: z.boolean(),
reviewers: z.array(z.string().email()),
}).optional(),
});
const WorkflowConnectionSchema = z.object({
provider: z.literal('bedrock-agentcore-gateway'),
tools: z.array(z.object({
id: z.string(),
kind: z.literal('mcp'),
auth: z.object({
scheme: z.enum(['oauth2', 'api_key']),
scope: z.array(z.string()).optional(),
secret_ref: z.string().optional(),
}),
})),
});
const WorkflowSchema = z.object({
kind: z.literal('Workflow'),
metadata: WorkflowMetadataSchema,
connections: WorkflowConnectionSchema.optional(), // オプショナルに変更
io: z.object({
inputs: z.record(z.any()),
outputs: z.record(z.any()),
}),
steps: z.array(z.object({
id: z.string(),
name: z.string(),
tool: z.string(),
action: z.string(),
params: z.any(),
})),
});
// ドメインエンティティ(DDD)
export class WorkflowExecution {
private constructor(
public readonly id: string,
public readonly workflowName: string,
public readonly status: 'pending' | 'running' | 'completed' | 'failed',
public readonly startedAt: Date,
public readonly metadata: z.infer<typeof WorkflowMetadataSchema>,
) {}
static create(workflow: z.infer<typeof WorkflowSchema>): Result<WorkflowExecution, Error> {
const id = crypto.randomUUID();
return ok(new WorkflowExecution(
id,
workflow.metadata.name,
'pending',
new Date(),
workflow.metadata,
));
}
}
// リポジトリインターフェース(依存性の逆転)
interface IWorkflowRepository {
save(workflow: WorkflowExecution): ResultAsync<WorkflowExecution, Error>;
findById(id: string): ResultAsync<WorkflowExecution | null, Error>;
}
// ユースケース層(関数型アプローチ with neverthrow)
export class CreateAgentEnvironmentUseCase {
constructor(
private readonly iamClient: IAMClient,
private readonly ec2Client: EC2Client,
private readonly ecsClient: ECSClient,
private readonly repository: IWorkflowRepository,
) {}
execute(yamlContent: string): ResultAsync<{ taskArn: string; executionId: string }, Error> {
return ResultAsync.fromPromise(
Promise.resolve(yaml.load(yamlContent)),
(e) => new Error(`YAML parse error: ${e}`)
)
.andThen((parsed) => this.validateWorkflow(parsed))
.andThen((workflow) => this.checkKillSwitch(workflow))
.andThen((workflow) => this.createDynamicEnvironment(workflow))
.andThen((environment) => this.deployAgent(environment))
.map((result) => ({
taskArn: result.taskArn,
executionId: result.executionId,
}));
}
private validateWorkflow(data: unknown): Result<z.infer<typeof WorkflowSchema>, Error> {
const result = WorkflowSchema.safeParse(data);
if (!result.success) {
return err(new Error(`Validation error: ${result.error.message}`));
}
return ok(result.data);
}
private checkKillSwitch(workflow: z.infer<typeof WorkflowSchema>): Result<z.infer<typeof WorkflowSchema>, Error> {
// キルスイッチのチェックロジック
// DynamoDBテーブルから状態を確認
return ok(workflow);
}
private createDynamicEnvironment(
workflow: z.infer<typeof WorkflowSchema>
): ResultAsync<AgentEnvironment, Error> {
// 1. IAM Roleの動的生成
const createRole = this.createTaskRole(workflow);
// 2. Security Groupの動的生成
const createSecurityGroup = this.createSecurityGroup(workflow);
// 3. VPC Endpointの設定
const setupVpcEndpoints = this.setupVpcEndpoints(workflow);
return ResultAsync.combine([createRole, createSecurityGroup, setupVpcEndpoints])
.map(([roleInfo, securityGroupId, vpcEndpointIds]) => ({
workflow,
roleArn: roleInfo.roleArn,
roleName: roleInfo.roleName,
securityGroupId,
vpcEndpointIds,
}));
}
private createTaskRole(workflow: z.infer<typeof WorkflowSchema>): ResultAsync<{ roleArn: string; roleName: string }, Error> {
// YAMLで定義された権限から最小権限のIAMポリシーを生成
const policyDocument = this.generatePolicyDocument(workflow);
const roleName = `agent-${workflow.metadata.name}-${Date.now()}`;
return ResultAsync.fromPromise(
this.iamClient.send(new CreateRoleCommand({
RoleName: roleName,
AssumeRolePolicyDocument: JSON.stringify({
Version: '2012-10-17',
Statement: [{
Effect: 'Allow',
Principal: { Service: 'ecs-tasks.amazonaws.com' },
Action: 'sts:AssumeRole',
}],
}),
Tags: [
{ Key: 'Workflow', Value: workflow.metadata.name },
{ Key: 'Temporary', Value: 'true' },
{ Key: 'AutoDelete', Value: 'true' },
],
})),
(e) => new Error(`Failed to create IAM role: ${e}`)
).map((response) => ({
roleArn: response.Role!.Arn!,
roleName: roleName // クリーンアップ時に必要
}));
}
private generatePolicyDocument(workflow: z.infer<typeof WorkflowSchema>) {
// YAMLのconnections.toolsから必要な権限を抽出
const statements = workflow.connections.tools.map(tool => {
const permissions = this.getToolPermissions(tool.id);
return {
Effect: 'Allow',
Action: permissions.actions,
Resource: permissions.resources,
};
});
return {
Version: '2012-10-17',
Statement: statements,
};
}
private getToolPermissions(toolId: string) {
// ツールIDから必要な権限をマッピング
const permissionMap: Record<string, { actions: string[]; resources: string[] }> = {
'salesforce.query': {
actions: ['secretsmanager:GetSecretValue'],
resources: ['arn:aws:secretsmanager:*:*:secret:salesforce-*'],
},
'excel.graph': {
actions: ['s3:GetObject', 's3:PutObject'],
resources: ['arn:aws:s3:::excel-templates/*'],
},
'gdrive.files': {
actions: ['secretsmanager:GetSecretValue'],
resources: ['arn:aws:secretsmanager:*:*:secret:google-*'],
},
'slack.chat.post': {
actions: ['secretsmanager:GetSecretValue'],
resources: ['arn:aws:secretsmanager:*:*:secret:slack-*'],
},
};
return permissionMap[toolId] || { actions: [], resources: [] };
}
private createSecurityGroup(workflow: z.infer<typeof WorkflowSchema>): ResultAsync<string, Error> {
const groupName = `agent-sg-${workflow.metadata.name}-${Date.now()}`;
return ResultAsync.fromPromise(
this.ec2Client.send(new CreateSecurityGroupCommand({
GroupName: groupName,
Description: `Temporary security group for agent ${workflow.metadata.name}`,
VpcId: process.env.VPC_ID,
TagSpecifications: [{
ResourceType: 'security-group',
Tags: [
{ Key: 'WorkflowId', Value: workflow.metadata.name }, // 統一されたタグキー
{ Key: 'Temporary', Value: 'true' },
{ Key: 'AutoDelete', Value: 'true' },
],
}],
})),
(e) => new Error(`Failed to create security group: ${e}`)
).map((response) => response.GroupId!);
}
private setupVpcEndpoints(workflow: z.infer<typeof WorkflowSchema>): ResultAsync<string[], Error> {
// 必要なVPC Endpointを動的に設定
const requiredEndpoints = this.determineRequiredEndpoints(workflow);
return ResultAsync.combine(
requiredEndpoints.map(endpoint =>
ResultAsync.fromPromise(
this.createVpcEndpoint(endpoint),
(e) => new Error(`Failed to create VPC endpoint: ${e}`)
)
)
);
}
private determineRequiredEndpoints(workflow: z.infer<typeof WorkflowSchema>): string[] {
const region = process.env.AWS_REGION || 'ap-northeast-1';
const endpointMap: Record<string, string[]> = {
'salesforce.query': [`com.amazonaws.${region}.secretsmanager`],
'excel.graph': [`com.amazonaws.${region}.s3`],
'gdrive.files': [`com.amazonaws.${region}.secretsmanager`],
'slack.chat.post': [`com.amazonaws.${region}.secretsmanager`],
'textract.ocr': [`com.amazonaws.${region}.textract`], // Amazon Textractのエンドポイント
};
const endpoints = new Set<string>();
workflow.connections.tools.forEach(tool => {
const required = endpointMap[tool.id] || [];
required.forEach(ep => endpoints.add(ep));
});
return Array.from(endpoints);
}
private async createVpcEndpoint(serviceName: string): Promise<string> {
// VPC Endpoint作成ロジック(省略)
return `vpce-${crypto.randomUUID().split('-')[0]}`;
}
private deployAgent(environment: AgentEnvironment): ResultAsync<DeploymentResult, Error> {
const taskDefinition = this.buildEcsTaskDefinition(environment);
const executionId = crypto.randomUUID();
// Task Definitionにroleを設定して登録
const registerTaskDef = ResultAsync.fromPromise(
this.ecsClient.send(new RegisterTaskDefinitionCommand({
...taskDefinition,
taskRoleArn: environment.roleArn,
})),
(e) => new Error(`Failed to register task definition: ${e}`)
);
return registerTaskDef.andThen((registeredTaskDef) =>
ResultAsync.fromPromise(
this.ecsClient.send(new RunTaskCommand({
cluster: process.env.ECS_CLUSTER!,
taskDefinition: registeredTaskDef.taskDefinition!.taskDefinitionArn!,
launchType: 'FARGATE',
networkConfiguration: {
awsvpcConfiguration: {
subnets: process.env.PRIVATE_SUBNETS!.split(','),
securityGroups: [environment.securityGroupId],
assignPublicIp: 'DISABLED',
},
},
overrides: {
containerOverrides: [{
name: 'agent-runtime',
environment: [
{ name: 'WORKFLOW_ID', value: environment.workflow.metadata.name },
{ name: 'EXECUTION_ID', value: executionId },
],
}],
},
tags: [
{ key: 'WorkflowId', value: environment.workflow.metadata.name }, // キルスイッチで参照するキーに統一
{ key: 'AutoCleanup', value: 'true' },
],
})),
(e) => new Error(`Failed to deploy agent: ${e}`)
)
).map((response) => ({
taskArn: response.tasks![0].taskArn!,
executionId: executionId,
}));
}
private buildEcsTaskDefinition(environment: AgentEnvironment): TaskDefinition {
const complexity = environment.workflow.metadata.complexity;
return {
family: `agent-${environment.workflow.metadata.name}`,
networkMode: 'awsvpc',
requiresCompatibilities: ['FARGATE'],
cpu: this.calculateCpu(complexity),
memory: this.calculateMemory(complexity),
containerDefinitions: [{
name: 'agent-runtime',
image: `${process.env.ECR_REGISTRY}/ai-agent:latest`,
essential: true,
environment: [
{ name: 'KILL_SWITCH_ENABLED', value: 'true' },
{ name: 'DOMAIN', value: environment.workflow.metadata.domain },
],
logConfiguration: {
logDriver: 'awslogs',
options: {
'awslogs-group': '/ecs/ai-agent',
'awslogs-region': process.env.AWS_REGION!,
'awslogs-stream-prefix': environment.workflow.metadata.name,
},
},
healthCheck: {
command: ['CMD-SHELL', 'curl -f http://localhost:8080/health || exit 1'],
interval: 30,
timeout: 5,
retries: 3,
},
}],
};
}
private calculateCpu(complexity: string): string {
const cpuMap = {
'Basic': '256',
'Intermediate': '512',
'Advanced': '1024',
};
return cpuMap[complexity as keyof typeof cpuMap] || '512';
}
private calculateMemory(complexity: string): string {
const memoryMap = {
'Basic': '512',
'Intermediate': '1024',
'Advanced': '2048',
};
return memoryMap[complexity as keyof typeof memoryMap] || '1024';
}
}
// クリーンアップハンドラー(Lambda関数)
export class AgentCleanupHandler {
constructor(
private readonly iamClient: IAMClient,
private readonly ec2Client: EC2Client,
private readonly ecsClient: ECSClient,
) {}
async handleTaskStopped(event: ECSTaskStateChangeEvent): Promise<void> {
const tags = event.detail.tags;
if (tags?.AutoCleanup !== 'true') {
return;
}
const workflowName = tags.WorkflowId; // 統一されたタグキーを使用
// 並列でクリーンアップを実行
await Promise.all([
this.deleteIAMRole(workflowName),
this.deleteSecurityGroup(workflowName),
this.cleanupVpcEndpoints(workflowName),
]);
}
private async deleteIAMRole(roleName: string): Promise<void> {
// IAMClientを使用して名前でロールを削除
console.log(`Cleaning up IAM role: ${roleName}`);
// await this.iamClient.send(new DeleteRoleCommand({ RoleName: roleName }));
}
private async deleteSecurityGroup(workflowName: string): Promise<void> {
// セキュリティグループ削除ロジック
console.log(`Cleaning up security group for ${workflowName}`);
}
private async cleanupVpcEndpoints(workflowName: string): Promise<void> {
// VPCエンドポイント削除ロジック
console.log(`Cleaning up VPC endpoints for ${workflowName}`);
}
}
環境動的生成の実装詳細
1. YAMLからの権限マッピング
YAMLファイルに定義されたツールから、必要最小限のAWS権限を自動的に導出します:
権限マッピングエンジンの実装例。
// 権限マッピングエンジン
class PermissionMappingEngine {
private readonly permissionDatabase = new Map<string, PermissionSet>([
['salesforce.query', {
aws: ['secretsmanager:GetSecretValue'],
resources: ['arn:aws:secretsmanager:*:*:secret:salesforce-*'],
network: { outbound: ['443/tcp'] },
}],
['bedrock.agent', {
aws: ['bedrock:InvokeAgent', 'bedrock:RetrieveAndGenerate'],
resources: ['arn:aws:bedrock:*:*:agent/*'],
network: { vpc_endpoints: ['com.amazonaws.region.bedrock-runtime'] },
}],
]);
generatePolicy(tools: string[]): IAMPolicy {
const statements = tools
.map(tool => this.permissionDatabase.get(tool))
.filter(Boolean)
.reduce((acc, perm) => {
acc.actions.push(...perm!.aws);
acc.resources.push(...perm!.resources);
return acc;
}, { actions: [], resources: [] });
return {
Version: '2012-10-17',
Statement: [{
Effect: 'Allow',
Action: Array.from(new Set(statements.actions)),
Resource: Array.from(new Set(statements.resources)),
}],
};
}
}
2. 動的ネットワーク隔離
各エージェントは専用のセキュリティグループとVPCエンドポイントを持ちます:
ネットワーク隔離マネージャーの実装例。
// ネットワーク隔離マネージャー
class NetworkIsolationManager {
async createIsolatedNetwork(workflowId: string, requirements: NetworkRequirements) {
// 1. 専用セキュリティグループの作成
const securityGroup = await this.ec2Client.send(
new CreateSecurityGroupCommand({
GroupName: `agent-${workflowId}-${Date.now()}`,
Description: `Isolated network for agent ${workflowId}`,
VpcId: this.vpcId,
})
);
// 2. 必要最小限のアウトバウンドルールのみ設定
const egresRules = requirements.outbound.map(rule => ({
IpProtocol: rule.protocol,
FromPort: rule.port,
ToPort: rule.port,
CidrIp: '10.0.0.0/16', // 内部VPCのみ
}));
await this.ec2Client.send(
new AuthorizeSecurityGroupEgressCommand({
GroupId: securityGroup.GroupId,
IpPermissions: egresRules,
})
);
// 3. 必要なVPCエンドポイントの作成
const endpoints = await Promise.all(
requirements.vpc_endpoints.map(endpoint =>
this.createVpcEndpoint(endpoint, securityGroup.GroupId!)
)
);
return {
securityGroupId: securityGroup.GroupId!,
vpcEndpointIds: endpoints,
};
}
private async createVpcEndpoint(serviceName: string, securityGroupId: string) {
const response = await this.ec2Client.send(
new CreateVpcEndpointCommand({
VpcEndpointType: 'Interface',
ServiceName: serviceName,
VpcId: this.vpcId,
SubnetIds: this.privateSubnetIds,
SecurityGroupIds: [securityGroupId],
PrivateDnsEnabled: true,
})
);
return response.VpcEndpoint!.VpcEndpointId!;
}
}
3. ライフサイクル管理
エージェント環境は実行完了後、自動的にクリーンアップされます:
ライフサイクルマネージャーの実装例。
// ライフサイクルマネージャー
class AgentLifecycleManager {
private readonly cleanupQueue = new Map<string, CleanupTask>();
async registerAgent(agentId: string, resources: AgentResources & { roleName: string }) {
// EventBridgeルールの作成(ECSタスク終了を監視)
await this.eventsClient.send(
new PutRuleCommand({
Name: `agent-cleanup-${agentId}`,
EventPattern: JSON.stringify({
source: ['aws.ecs'],
'detail-type': ['ECS Task State Change'],
detail: {
taskArn: [resources.taskArn],
lastStatus: ['STOPPED'],
},
}),
State: 'ENABLED',
})
);
// クリーンアップタスクの登録
this.cleanupQueue.set(agentId, {
roleName: resources.roleName, // ARNではなく名前を保存
securityGroupId: resources.securityGroupId,
vpcEndpointIds: resources.vpcEndpointIds,
scheduledAt: new Date(Date.now() + 3600000), // 1時間後に強制削除
});
}
async executeCleanup(agentId: string) {
const task = this.cleanupQueue.get(agentId);
if (!task) return;
// 並列でリソースを削除
await Promise.allSettled([
this.deleteIAMRole(task.roleName), // 名前で削除
this.deleteSecurityGroup(task.securityGroupId),
...task.vpcEndpointIds.map(id => this.deleteVpcEndpoint(id)),
]);
this.cleanupQueue.delete(agentId);
}
}
この構成のメリットについて
1. 分離性とセキュリティ
- 完全隔離: 各エージェント実行が独立した環境で動作
- 最小権限: YAMLに定義された権限のみを動的に付与
- ネットワーク制御: 必要なエンドポイントのみへのアクセスを許可
- 短命なインフラ: 実行完了後は全リソースを自動削除し、痕跡を残さない
2. 動的生成がもたらす運用上の利点
リソース効率性
# YAMLでの複雑度定義が自動的にリソースに反映
metadata:
complexity: "Basic" # CPU: 256, Memory: 512MB
complexity: "Advanced" # CPU: 1024, Memory: 2048MB
監査性とコンプライアンス
- 全ての動的生成リソースにタグ付け
- CloudTrailで全操作を記録
- 実行毎のIAMロールで責任分界点を明確化
障害影響の最小化
- エージェント毎に独立した障害ドメイン
- 1つのエージェントの問題が他に波及しない
- リトライ時は新規環境で実行
3. スケーラビリティと性能
並列実行の最適化:
- ECS Fargateによる数十秒程度での起動
- 事前プルされたコンテナイメージによる起動時間の短縮
コスト最適化:
- 使用時のみリソースを生成(Pay-per-use)
- 複雑度に応じた適切なリソース割り当て
- VPCエンドポイントの共有による通信コスト削減
3. 可観測性
CloudWatchメトリクス統合の実装例。
// CloudWatchメトリクス統合(AWS SDK v3)
import { CloudWatchClient, PutMetricDataCommand } from '@aws-sdk/client-cloudwatch';
const cloudWatchClient = new CloudWatchClient({ region: 'ap-northeast-1' });
const putMetric = async (workflow: WorkflowDefinition, metricName: string, value: number) => {
const command = new PutMetricDataCommand({
Namespace: 'AIAgents',
MetricData: [
{
MetricName: metricName,
Value: value,
Unit: 'Count',
Timestamp: new Date(),
Dimensions: [
{ Name: 'WorkflowName', Value: workflow.metadata.name },
{ Name: 'Domain', Value: workflow.metadata.domain },
],
},
],
});
return await cloudWatchClient.send(command);
};
4. ネットワーク層防御とキルスイッチの協調動作
workflow型AIエージェントのセキュリティは、ネットワーク層での防御とエラー駆動型キルスイッチの組み合わせで実現します。これにより、常時監視によるコストを抑えながら、攻撃を即座に検知・遮断できます。
攻撃シナリオと多層防御の仕組み
workflow型エージェントへの攻撃は、主に以下のパターンで発生します:
- YAMLに定義されていないデータベースへのアクセス試行
- 許可されていない外部APIへの通信試行
- 内部ネットワークへの不正スキャン
これらの攻撃は、ネットワーク層とアプリケーション層の両方で防御します:
実装詳細:エラー駆動型キルスイッチ
エラー駆動型キルスイッチのコード例。
// キルスイッチ監視の設定(コスト最適化版)
import { CloudWatchLogsClient, PutMetricFilterCommand } from '@aws-sdk/client-cloudwatch-logs';
import { CloudWatchClient, PutMetricAlarmCommand } from '@aws-sdk/client-cloudwatch';
import { SNSClient, CreateTopicCommand, SubscribeCommand } from '@aws-sdk/client-sns';
import { EC2Client, CreateSecurityGroupCommand, AuthorizeSecurityGroupEgressCommand, RevokeSecurityGroupEgressCommand } from '@aws-sdk/client-ec2';
import {
Route53ResolverClient,
CreateFirewallDomainListCommand,
CreateFirewallRuleGroupCommand,
CreateFirewallRuleCommand,
AssociateFirewallRuleGroupCommand
} from '@aws-sdk/client-route53resolver';
// 1. ネットワーク層の防御設定
export const setupNetworkDefense = async (
workflowId: string,
allowedCidrs: string[], // CIDRブロック形式のIPアドレス
allowedDomains: string[] // 許可するドメインのリスト
) => {
const ec2Client = new EC2Client({ region: 'ap-northeast-1' });
const dnsClient = new Route53ResolverClient({ region: 'ap-northeast-1' });
// Security Group: デフォルト拒否、必要な通信のみ許可
const securityGroup = await ec2Client.send(new CreateSecurityGroupCommand({
GroupName: `agent-${workflowId}-sg`,
Description: `Restricted security group for agent ${workflowId}`,
VpcId: process.env.VPC_ID,
}));
// デフォルトのアウトバウンドルール(0.0.0.0/0)を削除
await ec2Client.send(new RevokeSecurityGroupEgressCommand({
GroupId: securityGroup.GroupId,
IpPermissions: [{
IpProtocol: '-1', // すべてのプロトコル
IpRanges: [{ CidrIp: '0.0.0.0/0' }],
}],
}));
// DNS解決のための通信を許可(VPC Resolverのみ)
// VPCのCIDRのベース+2がVPC ResolverのIPアドレス
const vpcCidr = process.env.VPC_CIDR || '10.0.0.0/16';
const vpcResolverIp = vpcCidr.replace(/\.\d+\/\d+$/, '.0.2'); // 例: 10.0.0.2
await ec2Client.send(new AuthorizeSecurityGroupEgressCommand({
GroupId: securityGroup.GroupId,
IpPermissions: [
{
IpProtocol: 'tcp',
FromPort: 53,
ToPort: 53,
IpRanges: [{ CidrIp: `${vpcResolverIp}/32` }], // VPC Resolver IPのみ
},
{
IpProtocol: 'udp',
FromPort: 53,
ToPort: 53,
IpRanges: [{ CidrIp: `${vpcResolverIp}/32` }], // VPC Resolver IPのみ
},
],
}));
// 必要なHTTPS通信のみ許可
for (const cidr of allowedCidrs) {
await ec2Client.send(new AuthorizeSecurityGroupEgressCommand({
GroupId: securityGroup.GroupId,
IpPermissions: [{
IpProtocol: 'tcp',
FromPort: 443,
ToPort: 443,
IpRanges: [{ CidrIp: cidr }],
}],
}));
}
// DNS Firewall: 許可リスト方式でドメインを制御
// 1. 許可ドメインリストを作成
const domainList = await dnsClient.send(new CreateFirewallDomainListCommand({
Name: `agent-${workflowId}-allowed-domains`,
Domains: allowedDomains,
}));
// 2. ファイアウォールルールグループを作成
const ruleGroup = await dnsClient.send(new CreateFirewallRuleGroupCommand({
Name: `agent-${workflowId}-rules`,
}));
// 3. 許可ルールとブロックルールを追加
await dnsClient.send(new CreateFirewallRuleCommand({
FirewallRuleGroupId: ruleGroup.FirewallRuleGroup!.Id,
FirewallDomainListId: domainList.FirewallDomainList!.Id,
Name: 'allow-listed-domains',
Action: 'ALLOW',
Priority: 100,
}));
// デフォルトブロックルール(許可リスト外をすべてブロック)
await dnsClient.send(new CreateFirewallRuleCommand({
FirewallRuleGroupId: ruleGroup.FirewallRuleGroup!.Id,
Name: 'block-all-others',
Action: 'BLOCK',
BlockResponse: 'NXDOMAIN',
Priority: 200,
FirewallDomainListId: '*', // すべてのドメインに適用
}));
// 4. VPCにルールグループを関連付け
await dnsClient.send(new AssociateFirewallRuleGroupCommand({
FirewallRuleGroupId: ruleGroup.FirewallRuleGroup!.Id,
VpcId: process.env.VPC_ID,
Priority: 101,
Name: `agent-${workflowId}-association`,
}));
return { securityGroupId: securityGroup.GroupId! };
};
// 2. CloudWatch Logsメトリクスフィルターの設定
export const setupKillSwitchMonitor = async (
logGroup: string,
workflowId: string,
killSwitchLambdaArn: string
) => {
const cwLogsClient = new CloudWatchLogsClient({ region: 'ap-northeast-1' });
const cwClient = new CloudWatchClient({ region: 'ap-northeast-1' });
const snsClient = new SNSClient({ region: 'ap-northeast-1' });
// SNSトピックを作成(CloudWatch AlarmからLambdaへの橋渡し)
const topic = await snsClient.send(new CreateTopicCommand({
Name: `agent-killswitch-${workflowId}`,
}));
// LambdaをSNSトピックにサブスクライブ
await snsClient.send(new SubscribeCommand({
TopicArn: topic.TopicArn!,
Protocol: 'lambda',
Endpoint: killSwitchLambdaArn,
}));
// ネットワークブロックエラーを検知するフィルターパターン
const networkErrorPatterns = [
{
filterName: 'ConnectionTimeout',
filterPattern: '{ $.error_type = "ETIMEDOUT" || $.error_type = "ENETUNREACH" || $.message = "*timeout*" || $.message = "*unreachable*" }',
metricName: 'ConnectionTimeoutErrors',
},
{
filterName: 'ConnectionRefused',
filterPattern: '{ $.error_type = "CONNECTION_REFUSED" || $.message = "*Connection refused*" }',
metricName: 'ConnectionRefusedErrors',
},
{
filterName: 'DNSResolutionFailed',
filterPattern: '{ $.error_type = "DNS_RESOLUTION_FAILED" || $.message = "*NXDOMAIN*" || $.message = "*ENOTFOUND*" }',
metricName: 'DNSResolutionErrors',
},
{
filterName: 'UnauthorizedAccess',
filterPattern: '{ $.error_type = "UNAUTHORIZED" || $.message = "*403*" || $.message = "*401*" }',
metricName: 'UnauthorizedAccessAttempts',
},
];
// 各エラーパターンに対してメトリクスフィルターを作成
for (const pattern of networkErrorPatterns) {
await cwLogsClient.send(new PutMetricFilterCommand({
logGroupName: logGroup,
filterName: `${pattern.filterName}-${workflowId}`,
filterPattern: pattern.filterPattern,
metricTransformations: [{
metricName: pattern.metricName,
metricNamespace: 'AIAgent/Security',
metricValue: '1',
defaultValue: 0,
unit: 'Count'
// 注: ディメンションサポートはリージョンに依存。対応リージョンの場合:
// dimensions: [{ name: 'WorkflowId', value: '$.workflow_id' }]
}],
}));
}
// 複合メトリクスアラーム:いずれかのエラーが閾値を超えたら発火
await cwClient.send(new PutMetricAlarmCommand({
AlarmName: `AIAgent-NetworkAttack-KillSwitch-${workflowId}`,
ComparisonOperator: 'GreaterThanThreshold',
EvaluationPeriods: 1,
Metrics: [
{
Id: 'm1',
MetricStat: {
Metric: {
MetricName: 'ConnectionTimeoutErrors',
Namespace: 'AIAgent/Security',
Dimensions: [{ Name: 'WorkflowId', Value: workflowId }],
},
Period: 60,
Stat: 'Sum',
},
ReturnData: false,
},
{
Id: 'm2',
MetricStat: {
Metric: {
MetricName: 'ConnectionRefusedErrors',
Namespace: 'AIAgent/Security',
Dimensions: [{ Name: 'WorkflowId', Value: workflowId }],
},
Period: 60,
Stat: 'Sum',
},
ReturnData: false,
},
{
Id: 'm3',
MetricStat: {
Metric: {
MetricName: 'DNSResolutionErrors',
Namespace: 'AIAgent/Security',
Dimensions: [{ Name: 'WorkflowId', Value: workflowId }],
},
Period: 60,
Stat: 'Sum',
},
ReturnData: false,
},
{
Id: 'm4',
MetricStat: {
Metric: {
MetricName: 'UnauthorizedAccessAttempts',
Namespace: 'AIAgent/Security',
Dimensions: [{ Name: 'WorkflowId', Value: workflowId }],
},
Period: 60,
Stat: 'Sum',
},
ReturnData: false,
},
{
Id: 'total',
Expression: 'm1 + m2 + m3 + m4',
ReturnData: true, // この値を監視対象にする
},
],
Threshold: 3, // 3回以上のエラーで発火
DatapointsToAlarm: 1,
ActionsEnabled: true,
AlarmActions: [topic.TopicArn!], // SNSトピックARNを指定
AlarmDescription: 'Detects network-level attacks and triggers kill switch',
}));
return {
filtersCreated: true,
alarmCreated: true,
snsTopicArn: topic.TopicArn!,
};
};
// 3. キルスイッチLambda関数(SNS経由で起動)
import { ECSClient, StopTaskCommand, ListTasksCommand, DescribeTasksCommand } from '@aws-sdk/client-ecs';
import { LambdaClient, PutFunctionConcurrencyCommand } from '@aws-sdk/client-lambda';
interface SNSMessage {
AlarmName: string;
AlarmDescription: string;
Trigger: {
MetricName: string;
Namespace: string;
Dimensions: Array<{ name: string; value: string }>;
};
}
export const killSwitchHandler = async (event: any) => {
const ecsClient = new ECSClient({ region: 'ap-northeast-1' });
const lambdaClient = new LambdaClient({ region: 'ap-northeast-1' });
// SNSメッセージをパース
const snsMessage: SNSMessage = JSON.parse(event.Records[0].Sns.Message);
console.log('Kill switch activated by alarm:', snsMessage.AlarmName);
console.log('Attack pattern detected:', snsMessage.Trigger?.MetricName);
// 攻撃を受けたタスクの特定と即座の停止
try {
// 1. アラームメタデータから該当タスクを特定
const workflowId = snsMessage.Trigger?.Dimensions?.find(d => d.name === 'WorkflowId')?.value;
if (!workflowId) {
console.error('WorkflowId not found in alarm dimensions');
return { statusCode: 400, body: 'WorkflowId not found' };
}
// 2. 該当するECSタスクを検索
const tasks = await ecsClient.send(new ListTasksCommand({
cluster: 'ai-agent-cluster',
desiredStatus: 'RUNNING',
}));
// 3. タスクのタグを確認して該当タスクを特定(バッチ処理で効率化)
const targetTasks = [];
if (tasks.taskArns && tasks.taskArns.length > 0) {
// 複数タスクを一度に取得(最大100タスク)
const taskDetails = await ecsClient.send(new DescribeTasksCommand({
cluster: 'ai-agent-cluster',
tasks: tasks.taskArns,
include: ['TAGS'], // タグを含めて取得
}));
for (const task of taskDetails.tasks || []) {
if (task?.tags?.find(tag => tag.key === 'WorkflowId' && tag.value === workflowId)) {
targetTasks.push(task.taskArn!);
}
}
}
// 4. 該当タスクの即座停止
const stopResults = await Promise.all(
targetTasks.map(async (taskArn) => {
try {
await ecsClient.send(new StopTaskCommand({
cluster: 'ai-agent-cluster',
task: taskArn,
reason: `KILL_SWITCH: Network attack detected - ${snsMessage.Trigger?.MetricName}`,
}));
return { taskArn, status: 'stopped' };
} catch (error) {
console.error(`Failed to stop task ${taskArn}:`, error);
return { taskArn, status: 'failed', error };
}
})
);
// 5. Control Plane Lambdaの一時停止(追加の防御層)
if (process.env.CONTROL_PLANE_FUNCTION_NAME) {
await lambdaClient.send(new PutFunctionConcurrencyCommand({
FunctionName: process.env.CONTROL_PLANE_FUNCTION_NAME,
ReservedConcurrentExecutions: 0, // 新規実行を一時停止
}));
}
// 6. インシデントログの記録
const incident = {
timestamp: new Date().toISOString(),
alarmName: snsMessage.AlarmName,
metricName: snsMessage.Trigger?.MetricName,
workflowId,
tasksKilled: stopResults.filter(r => r.status === 'stopped').length,
tasksFailed: stopResults.filter(r => r.status === 'failed').length,
action: 'EMERGENCY_STOP',
};
console.log('Kill switch execution completed:', incident);
return {
statusCode: 200,
body: JSON.stringify(incident),
};
} catch (error) {
console.error('Kill switch execution failed:', error);
throw error;
}
};
// 4. コスト最適化のポイント
export const costOptimizationConfig = {
// イベント駆動ではなくエラー駆動で監視
monitoring: {
type: 'error-driven',
// 常時ポーリングは行わない
polling: false,
// CloudWatch Logsのメトリクスフィルターはログスキャンとメトリクス出力に応じた課金
metricFilters: 'pay-per-log-scan-and-metric',
// CloudWatch Alarmは月額固定課金(アラーム単位)
alarmCost: 'monthly-per-alarm',
// SNSは発火時のトピック配信課金
snsActions: 'pay-per-notification',
},
// ネットワーク層での早期遮断
defense: {
// Security Groupは追加コストなし
securityGroups: 'no-additional-cost',
// DNS Firewallは解決回数ベース+VPC関連付け料金
dnsFirewall: 'per-query-plus-association',
// VPC Endpointは時間課金だが複数エージェントで共有可能
vpcEndpoints: 'hourly-shared-across-agents',
},
// Lambda実行は発火時のみ
killSwitch: {
// SNS経由でアラーム発火時のみLambda実行
execution: 'sns-triggered-on-alarm',
// 実行時間は通常1秒以内
averageDuration: '< 1 second',
// メモリは128MBで十分
memory: 128,
},
};
注意点とベストプラクティス
1. 監査ログの充実
監査ログの実装例。
// 詳細な監査ログ
const auditLog = {
timestamp: new Date().toISOString(),
executionId: context.executionId,
workflowName: workflow.metadata.name,
triggeredBy: context.identity.arn,
inputHash: crypto.createHash('sha256').update(JSON.stringify(input)).digest('hex'),
killSwitchStatus: killSwitch.getStatus(),
resourceUsage: {
memory: process.memoryUsage(),
cpu: process.cpuUsage(),
},
};
2. フェイルセーフ設計
- デフォルトで最も制限的な設定
- 明示的な許可がない限り実行不可
- タイムアウトの多層防御
3. 実際のワークフローの例
財務レポート生成ワークフロー
財務レポート生成ワークフローのYAML定義。
kind: Workflow
metadata:
title: "四半期財務レポート自動生成"
name: "financial_report_workflow"
domain: "Finance"
category: "Reporting"
complexity: "Advanced"
governance:
approval_required: true
reviewers: ["finance.review@corp.example"]
retention_policy:
pii: "referenced_only"
logs: "redact_sensitive"
keep_summaries_days: 365
triggers:
- type: "schedule"
cron: "0 0 1 */3 *" # 四半期ごと
steps:
- id: "validate_financial_data"
name: "財務データ検証"
tool: "validator"
critical: true
params:
schema: "financial_schema.json"
threshold:
variance: 0.2 # 20%以上の異常値で検証失敗
when:
not_in_range: ["2024-12-31", "2025-01-01"] # 年末年始は実行しない
カスタマーサポート自動応答
カスタマーサポート自動応答システムのYAML定義。
kind: Workflow
metadata:
title: "カスタマーサポート自動応答システム"
name: "customer_support_agent"
domain: "Support"
category: "Communication"
applications: ["Zendesk", "Slack", "Email"]
# ユーザーコンテキスト判定
user_context:
multimodal_signals:
sentiment_analysis: true
emotion_detection: true
# AIエージェントの制約設定
connections:
tools:
- id: "sentiment.analyzer"
kind: "mcp"
params:
threshold: -0.7 # ネガティブ感情が強い場合は人間にエスカレーション
- id: "response.generator"
kind: "mcp"
params:
confidence_threshold: 0.6 # 信頼度60%以下で停止
# セキュリティと権限制御
permissions:
allowed_roles: ["Support Agent", "Support Manager"]
data_scopes:
- name: "Customer.read"
resources: ["Profile", "History"]
prohibited_actions:
- "DELETE_CUSTOMER_DATA"
- "MODIFY_BILLING"
- "ISSUE_REFUND"
まとめ
workflow型のAIエージェントシステムにおけるキルスイッチは、単なる緊急停止装置ではなく、システム全体の安全性を担保する包括的なメカニズムです。YAMLによる宣言的な設定、トリガーと実行環境の分離、Lambda Function URLを活用したステートレスな実行、そして動的なコンテナ生成により、安全で制御可能なAIエージェント実行環境を実現できます。
AIエージェントの能力が急速に向上する中、これらの安全機構の実装は選択肢ではなく必須要件となっています。開発者は、イノベーションを追求しながらも、常に制御可能性と説明責任を念頭に置いてシステムを設計する必要があります。
参考リンク
- AWS Lambda Function URLs Documentation
- AWS Lambda Extensions for Circuit Breaker Pattern
- SST v3 Documentation
- CrewAI YAML Configuration
- Kestra Declarative Orchestration
脚注
-
McKinsey & Company. (2024). "AI agent architectures and autonomous systems". Retrieved from mckinsey.com ↩︎
-
CrewAI Documentation. (2024). "YAML-based Agent Configuration". github.com/joaomdmoura/crewAI ↩︎
-
McKinsey & Company. (2024). "Major AI companies agree on kill switch policy implementation". mckinsey.com ↩︎
-
Allganize AI. (2024). "Cascading effects in self-replicating AI systems". allganize.ai ↩︎
-
Information Age. (2024). "Regulatory requirements for AI kill switches". information-age.com ↩︎
Discussion