Gemini-2.5のSWE-Lancerベンチマークが公開されていない理由(推測)
追記 3/26 21:20 DeepResearchでちょっと調べました
ChatGPT上でGPT-4.5のツール呼び出し能力を検証してみる
という記事で、GPT-4.5が動的にtoolを呼べることを検証しました。
同様の内容で
Claudeも検証したところ、さすがと言ったところがツールがしっかり動的に呼べていますではあらゆるベンチマークで過去のLLMを上回っているとされるGeminiだとどうかというと
(誰かaistudioの履歴を共有できる方法があれば教えて...)
履歴
Agenticな動的意思決定課題集:実装と解説
この課題集では、シミュレーションと継続的な判断を組み合わせ、動的に変化する環境で目標を最適化するエージェントをPythonで実装します。各課題について、環境設定、シミュレーション、評価、判断、再設定のサイクルを回し、継続的な改善プロセスを示します。
共通の準備
まず、必要なライブラリをインポートします。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
from collections import deque
import math
SIRモデル用 (Task 3)
from scipy.integrate import odeint # 必要であれば使うが、今回は離散ステップで実装
結果を分かりやすく表示するための設定
pd.set_option('display.float_format', lambda x: '%.3f' % x)
課題1:災害時の避難誘導エージェント
目標: 総避難完了時間を最小化する。
環境:
人の到着率:ポアソン分布(初期λ=60人/10分 = 6人/分)
避難経路:複数(3つとする)
経路A: 収容人数 50人, 基本移動時間 5分, 混雑影響係数 0.1
経路B: 収容人数 80人, 基本移動時間 8分, 混雑影響係数 0.08
経路C: 収容人数 40人, 基本移動時間 4分, 混雑影響係数 0.15
混雑による速度低下:移動時間 = 基本移動時間 * (1 + 混雑影響係数 * 現在人数)
エージェントの行動: 10分ごとに各経路の状況(混雑度、処理速度)を評価し、次の10分間の新規到着者の誘導割合を再設定する。
実装:
--- 課題1: 避難誘導エージェント ---
class EvacuationRoute:
def init(self, name, capacity, base_time, congestion_factor, time_step=1):
self.name = name
self.capacity = capacity # 同時に経路内にいられる最大人数に近い概念だが、ここでは混雑計算に使用
self.base_time = base_time # 分
self.congestion_factor = congestion_factor
self.time_step = time_step # シミュレーションの時間刻み(分)
self.people_on_route = deque() # (person_id, entry_time)
self.evacuated_count = 0
self.total_entered = 0
def get_current_travel_time(self):
"""現在の混雑状況に基づいた移動時間を計算"""
# 収容人数超過ではなく、単純に人数で混雑度が上がると仮定
# return self.base_time * (1 + self.congestion_factor * max(0, len(self.people_on_route) - self.capacity))
# より単純化:経路内の人数が多いほど遅くなる
return self.base_time * (1 + self.congestion_factor * len(self.people_on_route))
def add_person(self, person_id, current_time):
"""経路に人を追加"""
self.people_on_route.append((person_id, current_time))
self.total_entered += 1
def update(self, current_time):
"""1タイムステップ分、経路内の人の移動を進める"""
evacuated_in_step = 0
current_travel_time = self.get_current_travel_time()
# 経路の先頭から、避難完了したかチェック
while self.people_on_route:
person_id, entry_time = self.people_on_route[0]
# 滞在時間 >= 現在の移動時間 で避難完了とする(簡略化)
if current_time - entry_time >= current_travel_time:
self.people_on_route.popleft()
self.evacuated_count += 1
evacuated_in_step += 1
else:
# 先頭がまだなら、それ以降もまだ完了していない
break
return evacuated_in_step
def get_status(self):
"""経路の現在の状況を返す"""
return {
"name": self.name,
"occupancy": len(self.people_on_route),
"current_travel_time": self.get_current_travel_time(),
"total_entered": self.total_entered,
"total_evacuated": self.evacuated_count
}
class EvacuationAgent:
def init(self, routes, initial_arrival_rate_per_min):
self.routes = {route.name: route for route in routes}
self.arrival_rate_per_min = initial_arrival_rate_per_min
# 初期誘導割合は均等
self.guidance_proportions = {name: 1.0 / len(routes) for name in self.routes.keys()}
self.total_people_arrived = 0
self.total_people_evacuated = 0
self.people_waiting = deque()
self.current_time = 0
self.log = []
self.person_counter = 0
def run_simulation_step(self):
"""1分間のシミュレーションを実行"""
self.current_time += 1
# 1. 人の到着
arrivals = np.random.poisson(self.arrival_rate_per_min)
self.total_people_arrived += arrivals
for _ in range(arrivals):
self.person_counter += 1
self.people_waiting.append(self.person_counter)
# 2. 待機者の誘導
num_to_guide = len(self.people_waiting)
guided_counts = {name: 0 for name in self.routes.keys()}
# 割合に基づいて各経路へ振り分け(整数に丸める必要あり)
temp_proportions = self.guidance_proportions.copy()
remaining_people = num_to_guide
sorted_routes = sorted(self.routes.keys(), key=lambda n: temp_proportions[n], reverse=True)
for i, name in enumerate(sorted_routes):
if remaining_people <= 0:
break
# 最後のルート以外は割合で計算、最後のルートは残り全て
if i < len(sorted_routes) - 1:
count = min(remaining_people, int(round(num_to_guide * temp_proportions[name])))
else:
count = remaining_people
guided_counts[name] = count
remaining_people -= count
# 実際に誘導実行
for name, count in guided_counts.items():
for _ in range(count):
if self.people_waiting:
person_id = self.people_waiting.popleft()
self.routes[name].add_person(person_id, self.current_time)
# 3. 各経路の更新(人の移動と避難完了)
step_evacuated = 0
for route in self.routes.values():
step_evacuated += route.update(self.current_time)
self.total_people_evacuated += step_evacuated
# 4. ログ記録 (任意)
# print(f"Time: {self.current_time}, Arrived: {arrivals}, Waiting: {len(self.people_waiting)}, Evacuated(step): {step_evacuated}")
return step_evacuated
def evaluate_and_adjust(self, period_duration=10):
"""指定期間の評価と誘導割合の調整"""
print(f"\n--- Evaluating period: Time {self.current_time - period_duration + 1} to {self.current_time} ---")
status_list = []
total_flow_through = 0
for name, route in self.routes.items():
status = route.get_status()
# この期間での避難者数を計算 (前回の評価からの差分)
prev_log = next((item for item in reversed(self.log) if item['time'] == self.current_time - period_duration), {'route_status': {n: {'total_evacuated': 0} for n in self.routes}})
status['flow_through'] = status['total_evacuated'] - prev_log['route_status'][name]['total_evacuated']
status['avg_occupancy_approx'] = status['occupancy'] # 簡単化のため期末の値を使う
status_list.append(status)
total_flow_through += status['flow_through']
status_df = pd.DataFrame(status_list).set_index('name')
print("Route Status at end of period:")
print(status_df[['occupancy', 'current_travel_time', 'flow_through', 'total_evacuated']])
# 調整ロジック:
# 評価指標:期間中のflow_through(処理能力)と期末の推定移動時間(混雑度)を考慮
# スコア = flow_through / (1 + current_travel_time) のような形で計算
# スコアが高い経路への誘導割合を増やす
scores = {}
total_score = 0
for status in status_list:
# flow_throughが0でも評価できるよう、微小値を追加
score = (status['flow_through'] + 0.1) / (1 + status['current_travel_time'])
scores[status['name']] = score
total_score += score
if total_score > 0:
new_proportions = {name: score / total_score for name, score in scores.items()}
self.guidance_proportions = new_proportions
print("\nNew Guidance Proportions:")
for name, prop in self.guidance_proportions.items():
print(f" {name}: {prop:.3f}")
else:
# スコアが全て0の場合(避難完了直後など)、均等割りに戻す
print("\nNo flow through or extreme congestion, resetting proportions to equal.")
self.guidance_proportions = {name: 1.0 / len(self.routes) for name in self.routes.keys()}
# 現在の状態をログに追加
self.log.append({
"time": self.current_time,
"total_arrived": self.total_people_arrived,
"total_evacuated": self.total_people_evacuated,
"people_waiting": len(self.people_waiting),
"guidance_proportions": self.guidance_proportions.copy(),
"route_status": {name: route.get_status() for name, route in self.routes.items()}
})
def run_full_process(self, num_periods=5, period_duration=10):
"""シミュレーション全体を実行"""
print("--- Task 1: Evacuation Guidance Simulation Start ---")
print(f"Initial Arrival Rate: {self.arrival_rate_per_min * 10} people / 10 min")
print("Initial Guidance Proportions:")
for name, prop in self.guidance_proportions.items():
print(f" {name}: {prop:.3f}")
# 初期状態をログに記録
self.log.append({
"time": 0, "total_arrived": 0, "total_evacuated": 0, "people_waiting": 0,
"guidance_proportions": self.guidance_proportions.copy(),
"route_status": {name: route.get_status() for name, route in self.routes.items()}
})
for period in range(num_periods):
print(f"\n>>> Starting Period {period + 1}/{num_periods} (Time {self.current_time + 1} to {self.current_time + period_duration}) <<<")
# period_duration分シミュレーション実行
for _ in range(period_duration):
self.run_simulation_step()
# 全員避難したら終了
if self.total_people_evacuated >= self.total_people_arrived and len(self.people_waiting) == 0 and all(r.get_status()['occupancy'] == 0 for r in self.routes.values()):
print(f"\n--- All people evacuated at Time {self.current_time} ---")
break
# 全員避難したらループを抜ける
if self.total_people_evacuated >= self.total_people_arrived and len(self.people_waiting) == 0 and all(r.get_status()['occupancy'] == 0 for r in self.routes.values()):
break
# 期間終了後、評価と調整
self.evaluate_and_adjust(period_duration)
# シミュレーション終了後、最終結果を表示
final_time = self.current_time
# もし指定期間内に終わらなければ、避難完了までシミュレーションを続ける
while not (self.total_people_evacuated >= self.total_people_arrived and len(self.people_waiting) == 0 and all(r.get_status()['occupancy'] == 0 for r in self.routes.values())):
if self.current_time > num_periods * period_duration * 2: # 無限ループ防止
print("\nWarning: Simulation timed out before full evacuation.")
final_time = self.current_time
break
self.run_simulation_step()
final_time = self.current_time
print("\n--- Simulation Finished ---")
print(f"Total time steps (minutes): {final_time}")
print(f"Total people arrived: {self.total_people_arrived}")
print(f"Total people evacuated: {self.total_people_evacuated}")
final_status = pd.DataFrame([r.get_status() for r in self.routes.values()]).set_index('name')
print("Final Route Status:")
print(final_status[['total_entered', 'total_evacuated']])
return final_time, self.log
--- 実行 ---
経路設定
route_a = EvacuationRoute("Route A", capacity=50, base_time=5, congestion_factor=0.1)
route_b = EvacuationRoute("Route B", capacity=80, base_time=8, congestion_factor=0.08)
route_c = EvacuationRoute("Route C", capacity=40, base_time=4, congestion_factor=0.15)
routes = [route_a, route_b, route_c]
エージェント初期化と実行
initial_lambda_per_10min = 60
agent1 = EvacuationAgent(routes, initial_lambda_per_10min / 10.0)
final_time_t1, log_t1 = agent1.run_full_process(num_periods=5, period_duration=10)
結果の可視化(任意)
log_df_t1 = pd.DataFrame(log_t1)
fig, axes = plt.subplots(3, 1, figsize=(12, 15), sharex=True)
避難者数推移
axes[0].plot(log_df_t1['time'], log_df_t1['total_arrived'], label='Total Arrived', color='blue')
axes[0].plot(log_df_t1['time'], log_df_t1['total_evacuated'], label='Total Evacuated', color='green')
axes[0].set_ylabel('Number of People')
axes[0].set_title('Evacuation Progress')
axes[0].legend()
axes[0].grid(True)
経路ごとの占有数
for route_name in agent1.routes.keys():
occupancy = [log['route_status'][route_name]['occupancy'] for log in log_t1]
axes[1].plot(log_df_t1['time'], occupancy, label=f'{route_name} Occupancy')
axes[1].set_ylabel('Occupancy')
axes[1].set_title('Route Occupancy Over Time')
axes[1].legend()
axes[1].grid(True)
誘導割合の変化
proportions_data = {name: [] for name in agent1.routes.keys()}
for log_entry in log_t1:
for name, prop in log_entry['guidance_proportions'].items():
proportions_data[name].append(prop)
for name, props in proportions_data.items():
axes[2].plot(log_df_t1['time'], props, label=f'{name} Proportion', marker='o', linestyle='--')
axes[2].set_xlabel('Time (minutes)')
axes[2].set_ylabel('Guidance Proportion')
axes[2].set_title('Agent Guidance Proportion Adjustment')
axes[2].legend()
axes[2].grid(True)
plt.tight_layout()
plt.show()
IGNORE_WHEN_COPYING_START
content_copy
download
Use code with caution.
Python
IGNORE_WHEN_COPYING_END
解説 (課題1):
EvacuationRouteクラスで各経路の状態(経路内の人数、避難完了者数など)と、混雑度に応じた移動時間の計算、人の追加・移動処理を管理します。
EvacuationAgentクラスが全体のシミュレーションを制御します。
run_simulation_stepメソッドで1分ごとのシミュレーション(到着、誘導、移動)を行います。
evaluate_and_adjustメソッドがエージェントの「判断」部分です。10分間のシミュレーション後、各経路の処理能力(期間中の避難完了者数)と混雑度(推定移動時間)を評価します。ここでは簡単なスコアリング(処理能力 / (1 + 推定移動時間))で経路の「良さ」を評価し、そのスコアに基づいて次の10分間の誘導割合を動的に変更します。混雑して遅い経路への誘導を減らし、比較的空いていて速い経路への誘導を増やします。
run_full_processでこの「シミュレーション→評価→調整」のサイクルを5回(計50分)繰り返します。
最終的に、全員が避難完了するまでの総時間を最小化することを目指します。シミュレーション結果として、総避難時間、到着者数、避難者数が表示されます。グラフでは、避難の進捗、各経路の混雑具合、エージェントによる誘導割合の変化が可視化され、動的な判断が行われていることが確認できます。
課題2:オンライン広告最適化エージェント
目標: 総クリック数を最大化する。
環境:
広告数:5つとする。
各広告の真のCTR:ベルヌーイ分布のパラメータp。初期値として 0.05 から 0.15 の間でランダムに設定(エージェントは知らない)。
表示回数:1日あたり10,000回とする。
ユーザー行動変化:簡単化のため、今回は真のCTRは不変とし、エージェントが未知のCTRを学習する問題(Multi-Armed Bandit問題)として扱います。
エージェントの行動: 24時間(1サイクル)ごとに各広告のクリック率(実績CTR)を評価し、次の24時間の広告表示配分戦略を調整する。UCB1 (Upper Confidence Bound 1) アルゴリズムを使用します。
実装:
--- 課題2: オンライン広告最適化エージェント ---
class Ad:
def init(self, name, true_ctr):
self.name = name
self.true_ctr = true_ctr # エージェントはこれを知らない
self.impressions = 0
self.clicks = 0
def simulate_impression(self):
"""1回の表示をシミュレーションし、クリックされたか返す"""
self.impressions += 1
if random.random() < self.true_ctr:
self.clicks += 1
return 1 # Click
else:
return 0 # No click
def get_observed_ctr(self):
"""観測されたCTRを返す"""
if self.impressions == 0:
return 0.0
return self.clicks / self.impressions
def get_status(self):
return {
"name": self.name,
"impressions": self.impressions,
"clicks": self.clicks,
"observed_ctr": self.get_observed_ctr(),
# "true_ctr": self.true_ctr # デバッグ用、通常エージェントはアクセス不可
}
class AdOptimizationAgent:
def init(self, ads, impressions_per_cycle, exploration_factor=2.0):
self.ads = {ad.name: ad for ad in ads}
self.impressions_per_cycle = impressions_per_cycle
self.exploration_factor = exploration_factor # UCB1の探索強度 C
self.total_impressions = 0
self.cycle = 0
self.log = []
# 初期配分は均等
self.allocation = {name: 1.0 / len(ads) for name in self.ads.keys()}
def _calculate_ucb_scores(self):
"""各広告のUCB1スコアを計算"""
scores = {}
total_impressions_overall = max(1, self.total_impressions) # log(0)回避
for name, ad in self.ads.items():
if ad.impressions == 0:
# まだ表示されていない広告は最優先で試す
scores[name] = float('inf')
else:
observed_ctr = ad.get_observed_ctr()
exploration_bonus = self.exploration_factor * math.sqrt(math.log(total_impressions_overall) / ad.impressions)
scores[name] = observed_ctr + exploration_bonus
return scores
def choose_allocation(self):
"""UCBスコアに基づいて次のサイクルの表示配分を決定"""
self.cycle += 1
print(f"\n--- Cycle {self.cycle}: Choosing Allocation ---")
if self.cycle == 1:
# 初回は均等割当
print("Initial cycle: Using equal allocation.")
self.allocation = {name: 1.0 / len(self.ads) for name in self.ads.keys()}
return self.allocation
# UCBスコア計算
ucb_scores = self._calculate_ucb_scores()
print("UCB Scores:")
for name, score in ucb_scores.items():
print(f" {name}: {score:.4f}")
# 最もスコアの高い広告を選択 (Winner-takes-all戦略も考えられるが、ここでは単純化のためUCBスコアを元に配分はしない)
# UCBの典型的な実装では、各ステップで最もスコアの高いアームを引く。
# この課題ではサイクルごとに配分を決めるため、最もスコアの高い広告に全表示を割り当てる戦略をとる。
# (より洗練させるなら、スコアに応じて配分するソフトマックス戦略などもある)
best_ad_name = max(ucb_scores, key=ucb_scores.get)
print(f"Highest UCB score: {best_ad_name}. Allocating all impressions to it for the next cycle.")
# best_ad_nameに100%割り当て
new_allocation = {name: 0.0 for name in self.ads.keys()}
new_allocation[best_ad_name] = 1.0
self.allocation = new_allocation
# ログ記録用に現在の状態を追加
self.log_status()
return self.allocation
def run_simulation_cycle(self):
"""1サイクル(24時間)のシミュレーションを実行"""
print(f"--- Running Simulation for Cycle {self.cycle} ---")
print("Allocation:", {k: f"{v:.2f}" for k, v in self.allocation.items()})
cycle_clicks = 0
cycle_impressions = {name: 0 for name in self.ads.keys()}
# 配分に基づいてインプレッションを割り当ててシミュレーション
allocated_impressions = {name: int(self.impressions_per_cycle * prop) for name, prop in self.allocation.items()}
# 丸め誤差を調整
diff = self.impressions_per_cycle - sum(allocated_impressions.values())
if diff > 0:
# 最も配分が多い広告に追加
most_allocated_ad = max(self.allocation, key=self.allocation.get)
allocated_impressions[most_allocated_ad] += diff
for name, count in allocated_impressions.items():
ad = self.ads[name]
for _ in range(count):
click = ad.simulate_impression()
cycle_clicks += click
cycle_impressions[name] += count
self.total_impressions += count
print(f"Cycle {self.cycle} Results: Total Clicks = {cycle_clicks}")
print("Impressions this cycle:", cycle_impressions)
# print("Current Ad Status:")
# print(pd.DataFrame([ad.get_status() for ad in self.ads.values()]).set_index('name')[['impressions', 'clicks', 'observed_ctr']])
def log_status(self):
"""現在の状態をログに記録"""
status = {
"cycle": self.cycle,
"total_impressions": self.total_impressions,
"total_clicks": sum(ad.clicks for ad in self.ads.values()),
"allocation": self.allocation.copy(),
"ad_status": {name: ad.get_status() for name, ad in self.ads.items()}
}
# UCBスコアも記録(決定後のため、次のサイクルの決定に使われるスコア)
if self.cycle > 0:
status["ucb_scores"] = self._calculate_ucb_scores()
self.log.append(status)
def run_full_process(self, num_cycles=5):
"""シミュレーション全体を実行"""
print("\n--- Task 2: Ad Optimization Simulation Start ---")
print(f"Impressions per cycle: {self.impressions_per_cycle}")
print(f"Exploration Factor (C): {self.exploration_factor}")
print("True CTRs (Unknown to agent):")
for ad in self.ads.values():
print(f" {ad.name}: {ad.true_ctr:.4f}")
self.log_status() # 初期状態(cycle 0)
for i in range(num_cycles):
# 1. 次の配分を決定 (評価と判断)
self.choose_allocation()
# 2. 決定した配分でシミュレーションを実行
self.run_simulation_cycle()
# 最終状態をログに追加
self.log_status()
print("\n--- Simulation Finished ---")
final_status_df = pd.DataFrame([ad.get_status() for ad in self.ads.values()]).set_index('name')
print("Final Ad Status:")
print(final_status_df[['impressions', 'clicks', 'observed_ctr']])
total_clicks = sum(ad.clicks for ad in self.ads.values())
print(f"\nTotal Clicks over {num_cycles} cycles: {total_clicks}")
return total_clicks, self.log
--- 実行 ---
広告設定 (真のCTRはエージェントには未知)
np.random.seed(42) # 再現性のため
ads_t2 = [
Ad("Ad 1", np.random.uniform(0.05, 0.15)),
Ad("Ad 2", np.random.uniform(0.05, 0.15)),
Ad("Ad 3", np.random.uniform(0.05, 0.15)),
Ad("Ad 4", np.random.uniform(0.05, 0.15)),
Ad("Ad 5", np.random.uniform(0.05, 0.15))
]
エージェント初期化と実行
agent2 = AdOptimizationAgent(ads_t2, impressions_per_cycle=10000, exploration_factor=1.5) # C=1.5程度に設定
total_clicks_t2, log_t2 = agent2.run_full_process(num_cycles=5)
結果の可視化(任意)
log_df_t2 = pd.DataFrame(log_t2)
fig, axes = plt.subplots(3, 1, figsize=(12, 15), sharex=True)
総クリック数の推移
axes[0].plot(log_df_t2['cycle'], log_df_t2['total_clicks'], marker='o', label='Total Clicks')
axes[0].set_ylabel('Total Clicks')
axes[0].set_title('Cumulative Clicks Over Cycles')
axes[0].grid(True)
各広告の観測CTR推移
for ad_name in agent2.ads.keys():
observed_ctrs = [log['ad_status'][ad_name]['observed_ctr'] if log['ad_status'][ad_name]['impressions'] > 0 else 0 for log in log_t2]
true_ctr = agent2.ads[ad_name].true_ctr
axes[1].plot(log_df_t2['cycle'], observed_ctrs, marker='.', linestyle='--', label=f'{ad_name} Observed CTR')
axes[1].axhline(y=true_ctr, color=plt.gca().lines[-1].get_color(), linestyle=':', alpha=0.7, label=f'{ad_name} True CTR')
axes[1].set_ylabel('Observed CTR')
axes[1].set_title('Observed vs True CTR Over Cycles')
axes[1].legend(ncol=2)
axes[1].grid(True)
広告配分の変化
allocation_data = {name: [] for name in agent2.ads.keys()}
cycle 0 の alloc は均等なのでスキップしない
for log_entry in log_t2:
# cycle X の allocation は cycle X+1 で使われるもの
alloc = log_entry.get('allocation', {}) # cycle 0にはallocがない場合があるのでget
for name in agent2.ads.keys():
allocation_data[name].append(alloc.get(name, 0.0))
cycles_for_alloc = log_df_t2['cycle'].tolist() # X軸用
for name, allocs in allocation_data.items():
axes[2].plot(cycles_for_alloc, allocs, marker='o', linestyle='--', label=f'{name} Allocation')
axes[2].set_xlabel('Cycle (Decision Point)')
axes[2].set_ylabel('Impression Allocation')
axes[2].set_title('Agent Allocation Strategy Adjustment (Allocation for next cycle)')
axes[2].legend(ncol=2)
axes[2].grid(True)
axes[2].set_xticks(cycles_for_alloc) # サイクル番号を X 軸に表示
plt.tight_layout()
plt.show()
IGNORE_WHEN_COPYING_START
content_copy
download
Use code with caution.
Python
IGNORE_WHEN_COPYING_END
解説 (課題2):
Adクラスが各広告の状態(表示回数、クリック数)を管理し、クリックのシミュレーションを行います。真のCTR (true_ctr) は保持しますが、エージェントはアクセスできません。
AdOptimizationAgentクラスがエージェントとして動作します。
_calculate_ucb_scoresメソッドで、UCB1アルゴリズムに基づき各広告のスコアを計算します。スコアは「観測されたCTR(実績)」+「探索ボーナス(まだ試行回数が少ない広告を優遇)」で構成され、Exploitation(活用)とExploration(探索)のバランスを取ります。
choose_allocationメソッドがエージェントの「判断」です。サイクル開始時にUCBスコアを計算し、最もスコアの高い広告に次のサイクルの全インプレッションを割り当てる戦略を取ります(UCBの基本的な適用方法)。
run_simulation_cycleメソッドで、決定された配分に基づき1サイクル分の広告表示とクリックをシミュレーションします。
run_full_processで、「配分決定(評価・判断)→シミュレーション」のサイクルを5回繰り返します。
最終的に、期間中の総クリック数を最大化することを目指します。結果として総クリック数が表示され、グラフではクリック数の累積、各広告の観測CTRが真のCTRに収束していく様子、そしてエージェントが有望な広告(真のCTRが高い広告)へ表示を集中させていく動的な配分調整の様子が可視化されます。
課題3:感染症拡大防止エージェント
目標: 感染者数と経済的損失のバランスを最適化する(目的関数を最小化)。
環境:
感染モデル:SIRモデル(Susceptible, Infected, Recovered)を離散時間ステップでシミュレーション。
パラメータ(例):
総人口 N = 1,000,000
初期感染者 I0 = 100, 初期回復者 R0 = 0, S0 = N - I0 - R0
基本再生産数 R0_base = 2.5 (対策なしの場合)
平均感染期間 D = 14日 (回復率 gamma = 1/D)
基本感染率 beta_base = R0_base * gamma
対策:4段階(0: なし, 1: 軽度, 2: 中度, 3: 強度)
効果(beta乗数): effectiveness = [1.0, 0.8, 0.5, 0.2] (betaをこの値で乗算)
経済コスト(日あたり): cost = [0, 10, 50, 200] (単位は抽象的)
目的関数:総感染ピーク値 + weight * 総経済コスト を最小化 (ここではピーク値と総コストを評価)。weightはトレードオフ係数。
エージェントの行動: 7日間ごとに感染状況(特に現在の感染者数 I とその変化傾向)を評価する。次の7日間の最適な対策レベルを、各対策レベルを適用した場合の短期シミュレーション予測に基づいて決定する。
実装:
--- 課題3: 感染症拡大防止エージェント ---
class SIRModel:
def init(self, N, I0, R0, beta_base, gamma):
self.N = N
self.S = N - I0 - R0
self.I = I0
self.R = R0
self.beta_base = beta_base
self.gamma = gamma
self.time = 0
self.history = [{'time': 0, 'S': self.S, 'I': self.I, 'R': self.R}]
def step(self, intervention_effectiveness):
"""1日分のSIRモデル更新"""
if self.I == 0: # 感染者がいなければ変化なし
new_S, new_I, new_R = self.S, self.I, self.R
else:
beta_effective = self.beta_base * intervention_effectiveness
# 離散時間での変化を計算
new_infections = (beta_effective * self.S * self.I) / self.N
new_recoveries = self.gamma * self.I
# 値が負にならないように調整
new_infections = min(self.S, new_infections) # S人以上は感染しない
new_recoveries = min(self.I, new_recoveries) # I人以上は回復しない
new_S = self.S - new_infections
new_I = self.I + new_infections - new_recoveries
new_R = self.R + new_recoveries
# 整数に丸める (任意、人口が多いので小数でも良い場合も)
# new_S, new_I, new_R = round(new_S), round(new_I), round(new_R)
# 整合性チェック (合計がNになるように調整, Iを優先)
# current_total = new_S + new_I + new_R
# diff = self.N - current_total
# new_S += diff # Sで調整
self.S, self.I, self.R = new_S, new_I, new_R
self.time += 1
self.history.append({'time': self.time, 'S': self.S, 'I': self.I, 'R': self.R})
return self.S, self.I, self.R
def run_simulation(self, days, intervention_effectiveness):
"""指定日数、指定効果でシミュレーションを実行"""
initial_state = {'S': self.S, 'I': self.I, 'R': self.R, 'time': self.time}
temp_history = [initial_state.copy()]
current_S, current_I, current_R = self.S, self.I, self.R
current_time = self.time
for _ in range(days):
if current_I == 0: break # 感染者ゼロなら終了
beta_eff = self.beta_base * intervention_effectiveness
infections = (beta_eff * current_S * current_I) / self.N
recoveries = self.gamma * current_I
infections = min(current_S, infections)
recoveries = min(current_I, recoveries)
next_S = current_S - infections
next_I = current_I + infections - recoveries
next_R = current_R + recoveries
current_S, current_I, current_R = next_S, next_I, next_R
current_time += 1
temp_history.append({'time': current_time, 'S': current_S, 'I': current_I, 'R': current_R})
return temp_history
def get_state(self):
return {'S': self.S, 'I': self.I, 'R': self.R, 'time': self.time}
class InfectionControlAgent:
def init(self, sir_model, intervention_levels, cost_per_day, evaluation_weight, prediction_horizon=7):
self.sir_model = sir_model
# intervention_levels = {0: 1.0, 1: 0.8, 2: 0.5, 3: 0.2} # effectiveness
# cost_per_day = {0: 0, 1: 10, 2: 50, 3: 200}
self.intervention_levels = intervention_levels
self.cost_per_day = cost_per_day
self.evaluation_weight = evaluation_weight # 経済コストの重み
self.prediction_horizon = prediction_horizon # 先読みシミュレーション期間
self.current_intervention_level = 0 # 初期値は対策なし
self.log = []
self.total_cost = 0
def evaluate_and_decide(self):
"""現状評価と次の対策レベルの決定"""
current_state = self.sir_model.get_state()
print(f"\n--- Evaluating at Day {current_state['time']} ---")
print(f"Current State: S={current_state['S']:.0f}, I={current_state['I']:.0f}, R={current_state['R']:.0f}")
print(f"Current Intervention Level: {self.current_intervention_level}")
# 各対策レベルでの短期予測を実行
predictions = {}
for level, effectiveness in self.intervention_levels.items():
# 現在の状態から予測を開始
temp_model = SIRModel(N=self.sir_model.N,
I0=current_state['I'],
R0=current_state['R'],
beta_base=self.sir_model.beta_base,
gamma=self.sir_model.gamma)
# 初期時刻を合わせる
temp_model.time = current_state['time']
temp_model.S = current_state['S']
predicted_history = temp_model.run_simulation(self.prediction_horizon, effectiveness)
# 予測期間中の最大感染者数とコストを評価
peak_I_pred = max(h['I'] for h in predicted_history) if predicted_history else current_state['I']
cost_pred = self.cost_per_day[level] * self.prediction_horizon
# 目的関数値(予測)
objective_pred = peak_I_pred + self.evaluation_weight * cost_pred
predictions[level] = {
"predicted_peak_I": peak_I_pred,
"predicted_cost": cost_pred,
"predicted_objective": objective_pred,
"final_I_pred": predicted_history[-1]['I'] if predicted_history else current_state['I']
}
print("\nShort-term Predictions (Next 7 days):")
pred_df = pd.DataFrame.from_dict(predictions, orient='index')
print(pred_df[['predicted_peak_I', 'predicted_cost', 'predicted_objective', 'final_I_pred']])
# 目的関数(予測値)を最小化する対策レベルを選択
best_level = min(predictions, key=lambda k: predictions[k]['predicted_objective'])
print(f"\nDecision: Choose Intervention Level {best_level} for the next {self.prediction_horizon} days.")
self.current_intervention_level = best_level
# ログ記録
self.log_status(predictions)
def run_simulation_period(self, days):
"""決定された対策レベルで指定期間シミュレーションを実行"""
print(f"--- Running Simulation: Day {self.sir_model.time + 1} to {self.sir_model.time + days} with Level {self.current_intervention_level} ---")
effectiveness = self.intervention_levels[self.current_intervention_level]
period_cost = 0
for _ in range(days):
if self.sir_model.I <= 0: # 感染終息なら終了
print(f"Infection eradicated at day {self.sir_model.time}.")
break
self.sir_model.step(effectiveness)
period_cost += self.cost_per_day[self.current_intervention_level]
self.total_cost += period_cost
print(f"End of period: Day {self.sir_model.time}, I={self.sir_model.I:.0f}. Cost for period: {period_cost}")
def log_status(self, predictions=None):
"""現在の状態と決定をログに記録"""
state = self.sir_model.get_state()
log_entry = {
"time": state['time'],
"S": state['S'],
"I": state['I'],
"R": state['R'],
"intervention_level": self.current_intervention_level,
"total_cost": self.total_cost,
"predictions": predictions # 意思決定時の予測結果も記録
}
self.log.append(log_entry)
def run_full_process(self, num_periods=4, period_duration=7):
"""シミュレーション全体を実行"""
print("\n--- Task 3: Infection Control Simulation Start ---")
print(f"Population (N): {self.sir_model.N}")
print(f"Initial State: S={self.sir_model.S:.0f}, I={self.sir_model.I:.0f}, R={self.sir_model.R:.0f}")
print("Intervention Levels & Costs:")
for level in self.intervention_levels.keys():
print(f" Level {level}: Effectiveness={self.intervention_levels[level]:.2f}, Cost/day={self.cost_per_day[level]}")
print(f"Objective: Minimize Peak_I + {self.evaluation_weight} * Total_Cost")
# 初期状態ログ
self.log_status()
for period in range(num_periods):
# 1. 評価と次の対策決定
self.evaluate_and_decide()
# 2. 決定した対策でシミュレーション実行
self.run_simulation_period(period_duration)
if self.sir_model.I <= 0:
break # 感染終息
# 最終状態ログ
self.log_status()
print("\n--- Simulation Finished ---")
final_state = self.sir_model.get_state()
print(f"Final State at Day {final_state['time']}: S={final_state['S']:.0f}, I={final_state['I']:.0f}, R={final_state['R']:.0f}")
print(f"Total Economic Cost: {self.total_cost:.0f}")
# 期間中のピーク感染者数を計算
peak_I = max(entry['I'] for entry in self.sir_model.history)
print(f"Peak Infected during simulation: {peak_I:.0f}")
# 最終的な目的関数値(実績)
final_objective = peak_I + self.evaluation_weight * self.total_cost
print(f"Final Objective Value (Peak_I + weight*Cost): {final_objective:.2f}")
return peak_I, self.total_cost, self.log, self.sir_model.history
--- 実行 ---
パラメータ設定
N_t3 = 1_000_000
I0_t3 = 100
R0_t3 = 0
R0_base_t3 = 2.5
gamma_t3 = 1/14.0 # 平均感染期間14日
beta_base_t3 = R0_base_t3 * gamma_t3
intervention_effectiveness_t3 = {0: 1.0, 1: 0.8, 2: 0.5, 3: 0.3} # 効果を少し調整
intervention_cost_t3 = {0: 0, 1: 20, 2: 80, 3: 250} # コストを少し調整
eval_weight_t3 = 0.1 # 経済コストの重み (この値で結果が大きく変わる)
モデルとエージェント初期化
sir_model_t3 = SIRModel(N_t3, I0_t3, R0_t3, beta_base_t3, gamma_t3)
agent3 = InfectionControlAgent(sir_model_t3,
intervention_effectiveness_t3,
intervention_cost_t3,
eval_weight_t3,
prediction_horizon=7)
シミュレーション実行
peak_I_t3, total_cost_t3, log_t3, history_t3 = agent3.run_full_process(num_periods=4, period_duration=7)
結果の可視化(任意)
history_df_t3 = pd.DataFrame(history_t3)
log_df_t3 = pd.DataFrame(log_t3)
fig, axes = plt.subplots(3, 1, figsize=(12, 15), sharex=True)
SIR曲線
axes[0].plot(history_df_t3['time'], history_df_t3['S'], label='Susceptible (S)', color='blue')
axes[0].plot(history_df_t3['time'], history_df_t3['I'], label='Infected (I)', color='red')
axes[0].plot(history_df_t3['time'], history_df_t3['R'], label='Recovered (R)', color='green')
axes[0].set_ylabel('Number of People')
axes[0].set_title('SIR Model Simulation with Agent Control')
axes[0].legend()
axes[0].grid(True)
ピーク値を表示
peak_time = history_df_t3.loc[history_df_t3['I'].idxmax(), 'time']
axes[0].scatter(peak_time, peak_I_t3, color='red', zorder=5, label=f'Peak I: {peak_I_t3:.0f}')
axes[0].legend()
対策レベルの変化
ログの時間に合わせてプロット
times = log_df_t3['time'].tolist()
levels = log_df_t3['intervention_level'].tolist()
各期間の開始時刻とレベルを取得
plot_times = []
plot_levels = []
for i in range(len(times)):
start_time = times[i]
level = levels[i]
# 次のログエントリの時刻(期間終了時刻+1)まで、またはシミュレーション終了まで同じレベル
end_time = times[i+1] if i+1 < len(times) else history_df_t3['time'].max()
# 期間の開始と終了直前で点を打つことで階段状にする
plot_times.extend([start_time, end_time])
plot_levels.extend([level, level])
重複削除とソート
plot_data = sorted(list(set(zip(plot_times, plot_levels))))
plot_times_unique = [t for t, l in plot_data]
plot_levels_unique = [l for t, l in plot_data]
axes[1].plot(log_df_t3['time'], log_df_t3['intervention_level'], marker='o', linestyle='--', drawstyle='steps-post')
axes[1].plot(plot_times_unique, plot_levels_unique, marker='o', drawstyle='steps-post')
axes[1].set_ylabel('Intervention Level')
axes[1].set_title('Agent Intervention Level Adjustment Over Time')
axes[1].set_yticks(list(intervention_effectiveness_t3.keys()))
axes[1].grid(True)
累積コスト
axes[2].plot(log_df_t3['time'], log_df_t3['total_cost'], marker='.', label='Cumulative Economic Cost')
axes[2].set_xlabel('Time (Days)')
axes[2].set_ylabel('Total Cost')
axes[2].set_title('Cumulative Economic Cost Over Time')
axes[2].legend()
axes[2].grid(True)
plt.xlim(left=0)
plt.tight_layout()
plt.show()
IGNORE_WHEN_COPYING_START
content_copy
download
Use code with caution.
Python
IGNORE_WHEN_COPYING_END
解説 (課題3):
SIRModelクラスでSIRモデルの基本状態と更新ロジックを管理します。stepメソッドで1日分の更新、run_simulationで指定期間・指定効果でのシミュレーションを実行します(これはエージェントの予測に使われます)。
InfectionControlAgentクラスがエージェントです。
evaluate_and_decideメソッドが中核的な「判断」部分です。
現在の感染状況(S, I, R)を取得します。
利用可能な各対策レベル(0〜3)について、もしその対策を次の7日間適用したらどうなるか、SIRModel.run_simulationを使って短期予測を行います。
各予測結果から、予測期間中のピーク感染者数(predicted_peak_I)と経済コスト(predicted_cost)を計算します。
これらを用いて目的関数 (predicted_peak_I + weight * predicted_cost) の予測値を計算します。
目的関数の予測値を最も小さくする対策レベルを、次の7日間の対策として選択します。これが「シミュレーションに基づく動的な意思決定」です。
run_simulation_periodメソッドで、決定された対策レベルを用いて実際のSIRモデルを7日間進めます。
run_full_processで、「評価・予測・決定→シミュレーション実行」のサイクルを4回(計28日)繰り返します。
最終的に、シミュレーション期間全体のピーク感染者数と総経済コスト、およびそれらから計算される目的関数の実績値が表示されます。グラフでは、エージェントの介入によって制御されたSIR曲線、動的に変更される対策レベル、累積する経済コストが可視化されます。evaluation_weightの値を変えることで、感染抑制と経済維持のどちらを重視するかが変わり、エージェントの行動(選択する対策レベル)も変化します。
まとめと評価ポイント
これらの課題を通じて、以下の点が実装されました。
継続的な状況再評価・再判断: 各エージェントは、一定期間ごとにシミュレーション結果(混雑状況、CTR実績、感染状況)を評価し、その評価に基づいて次の行動(誘導割合、広告配分、対策レベル)を決定・変更しています。
連続的な改善プロセス: 初期設定や単純な戦略から開始し、シミュレーションと評価を繰り返す中で、エージェントはより最適(目標達成度が高い)な戦略へと自己調整していきます。これは単一のコード生成(例:静的な最適化計算)では実現できません。
シミュレーション結果の分析と戦略的対応:
課題1では、経路の流量と混雑度を分析し、効率の良い経路へ誘導を集中させます。
課題2では、UCB1アルゴリズムを用いてCTR実績と試行回数を分析し、探索と活用のバランスを取りながら最適な広告を見つけ出します。
課題3では、複数の未来(各対策レベルを適用した場合)をシミュレーションで予測・比較し、目的関数を最小化する戦略的な選択を行います。
これらの実装は、Pythonを用いてシミュレーション環境とエージェントの意思決定ロジックを構築し、それらを繰り返し実行することで、動的な環境下での継続的な最適化行動を示すものです。
このように命令しているにもかかわらずAgenticな対応ではなく実装をしてくるようになっています。
ただ
このように言うと
ちゃんと内容を理解していそうです。ですが...
こういう感じで命令しても通りません
つまり継続してツールを使う能力がGPT-4.5やClaudeと比べてありません。
これはClineのツール呼び出しなどを継続的に行うのが難しいというのと近いものがあります。
(ここからは推測ですが)
GoogleがSWE-Lancerを使わずにSWE-bench verfiedを使ったのは
このような指示追従能力にまだ欠けているということだと思います。
(ちなみにこの後指示がめちゃくちゃになって何にも伝わらなくなりました)
Discussion