プログラムの言語変換 & ローカル依存ファイルの集約を行うワークフロー(LangGraph・並列処理)
はじめに
こちらは何かに使えるかも?と実験的に作ってみたものですが、せっかくなので公開してみます。
主に以下の要素が含まれています。ご興味ある方は、LangGraphを使ったワークフローのサンプルとして(使えるかどうかは置いといて・・・)ご覧ください。
- プログラムの変換(集約)
- 並列・非同期処理
- ローカル依存ファイル(importなどで呼び出しを行っているローカルファイル)の探索
作業環境
OS: WSL2 Ubuntu22.04
Pythonバージョン: 3.10.12
主な内容
どういったことを行うのか、処理内容をざっくり説明するとフォルダ内にある全てのプログラムに対して言語変換(python → javaなど)と詳細設計書の作成を行います。その際にプログラム内でローカル依存ファイルがあった場合(呼び出し階層が複数またはネストしている場合でも)、それらの内容をまとめて一つのプログラムにします。
依存ファイルの集約を行う主なメリットとしては複数の言語(複数のファイル)で書かれたシステムを一つの言語(一つのファイル)に統一出来ることです。
一つのファイルにすることで実行を容易にして、自分が読みやすいコードで完結させることができます。(個人的にはコード量が少ない場合、まとまっていた方がコードが読み易いから。。。というのもあります。)
デメリットとしては、複数の機能が一つになりコードが長くなると保守性が低下してカスタマイズも行いづらくなることです。
また、llmやコード量にもよりますが最大トークン数をオーバーして集約したコードが生成しきれないということもあります。
そのため、個人で作成したような小規模なツールを対象として試すのが良いかと思います。
今後改修しないのであれば(例えばpythonに変換する場合)変換後にpyinstallerを使いexe化してしまうのも面白いかもしれません。
処理の流れ
処理の主な流れは以下のようになります。
- 指定したフォルダ内にあるファイルの中身をllmで解析してローカル依存ファイルを読み取ります。この際に依存関係が複数、またはネストしている場合も探索して取得していきます。
- ファイルに対応するローカル依存ファイルを一つにして詳細設計書を作成します。
- 詳細設計書を指定した言語の詳細設計書に変換します。
- 変換した詳細設計書からプログラムを生成します。
- 変換した詳細設計書と生成したプログラムを保存します。
上記が主な流れです。
1~4はパラメータで指定した数、並列(非同期)で処理されます(並列数3の場合は3ファイルずつ処理)。
4でプログラム生成後、未処理のファイルが残っている場合は1に戻ります。
ワークフロー図とノードの一覧
以下は実際に作成したLangGraphのワークフロー図とノードの一覧です。
- ワークフロー図
名前にparallelがついているノードが並列ノードです。図では一つのノードになっていますが実際には複数(パラメータで指定した並列数分)存在します。
また、図の分岐に「skip」となっている箇所があります。これは処理対象のタスクが完了せずに次のノードに進むのを防ぐためです。
「parallel_create_code」ノードから次のノードの呼び出しが分裂して行われるため、「check_task_completion」ノードで処理対象のタスクが完了しているかを判定して未完了状態の場合は「skip」として一旦「_end_」ノードに行くようにしています。
- ノードの一覧
ノード | 説明 |
---|---|
_start_ | 開始 |
read_dir | 処理対象となる全ファイルを取得する |
read_file | 処理対象ファイルを読み込む |
parallel_document_massege | llmに渡す詳細設計書作成メッセージを生成する(並列) |
parallel_create_document | 詳細設計書を作成、またはツールの呼出し用メッセージを作成する(並列) |
parallel_search_dependence_tool | 依存ファイルの検索ツールを呼び出す(並列) |
parallel_convert_document | 詳細設計書を指定された言語用に変換する(並列) |
parallel_create_code | コードを生成する(並列) |
check_task_completion | 処理対象ファイルのタスク完了を判定する |
move_bac | 元のファイルをバックアップする |
save_files | 変換された詳細設計書とコードを保存する |
_end_ | 終了 |
上記のとおり、「parallel_document_massege」~ 「parallel_create_code」までが並列で処理されるノードです。ある程度ノードをまとめてしまってもよいですが、処理ごとに分けておくとノードの付け替えなどでカスタマイズがしやすいかと思います。
コーディング
使用したライブラリ
コード内で使用した外部ライブラリとインストールコマンドは以下です。
$ pip install chardet==5.2.0
$ pip install aiofiles==23.2.1
$ pip install ipython==8.27.0
$ pip install langchain-core==0.3.28
$ pip install langchain-anthropic==0.3.0
$ pip install langgraph==0.2.60
全体のコード
import os
import re
import shutil
import chardet
import asyncio
import aiofiles
import operator
import time
from IPython.display import Image
from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage
from langchain.prompts.chat import ChatPromptTemplate
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph, END
from langchain_core.tools import BaseTool
from langgraph.prebuilt import ToolNode
from langgraph.types import Send
# llmのAPIキー
api_key=""
# 並列処理数
parallel_num = 3
# 変換先の言語
convert_language = "python"
# 処理対象ファイルの格納パス
read_dir_path = "./code_storage"
# 処理対象ファイルの拡張子
input_ext_list = ["go", "pro"]
# 処理対象外のファイル名・フォルダ名を設定
no_processe_list = ["bac", "example", "game", "board"]
# 出力ファイルの拡張子マッピング
output_ext_dict = {"python": "py"}
# LangGraphのワークフロー図を保存するパス
save_graph_img_path = "./graph_image.png"
# llmインスタンス
llm = ChatAnthropic(
model_name="claude-3-5-sonnet-latest",
max_tokens_to_sample=8192,
api_key=api_key
)
# 依存ファイルの検索を行う際の深さ制限
max_depth = 5
# 依存ファイル検索の並列処理数を制限
semaphore = asyncio.Semaphore(parallel_num)
# 依存ファイルを検索するツール
class SearchDependenceTool(BaseTool):
name: str = "search_dependence_tool"
description: str = """
対象のプログラムの中で依存している(呼び出しを行っている)ローカルファイルのプログラムがある場合、このツールにそのローカルファイルの絶対パスを与えます(複数可)。
このツールは与えられた絶対パスからファイルの中身を返します。
Args:
file_path_list (List[str]): 依存しているローカルファイルのパスのリスト
Returns:
str: 以下のフォーマットで整形されたファイル内容を含む文字列を返します。
File: [ファイルパス]
```
[ファイル内容]
```
エラーが発生した場合は、エラーメッセージが内容として含まれます。"""
# 同期処理メソッド(非対応)
def _run(self, file_path_list: list[str]):
raise NotImplementedError("同期処理には非対応")
# 非同期処理メソッド
async def _arun(self, file_path_list: list[str]):
# 依存ファイルのパスと中身を格納する文字列
result = ""
# 依存ファイル内のパスリストと深さを格納するキュー
task_queue = asyncio.Queue()
# 処理済みのファイルパスを格納するセット
processed_file_path_set = set()
# キューに初期値を設定
await task_queue.put((file_path_list, 1))
# キューが空になるまで処理を継続
while not task_queue.empty():
# キューからファイルパスリストと深さを取得
file_path_list, current_depth = await task_queue.get()
for path in file_path_list:
# すでに処理済みのファイルはスキップ
if path in processed_file_path_set:
continue
# 処理済みのパスとして格納
processed_file_path_set.add(path)
# ファイルのパスと中身をresultに追加
result += f"File: {path}\n"
result += "```" + "\n"
content = ""
try:
async with aiofiles.open(path, "r", encoding="utf-8") as f:
content = await f.read()
result += content.strip() + "\n"
except Exception as e:
result += f"Error reading file: {str(e)}\n"
result += "```" + "\n\n"
continue
result += "```" + "\n\n"
# 深さの制限に達したファイルは処理をスキップ
if current_depth >= max_depth:
continue
# semaphore_invokeメソッドを呼び出して依存ファイルの検索を実行
response = await self.semaphore_invoke(path, content)
# 依存ファイルが確認された場合、キューにファイルのパスを追加
if response.tool_calls:
file_path_list = response.tool_calls[0]["args"]["file_path_list"]
await task_queue.put((file_path_list, current_depth+1))
return result
# 依存ファイル検索するメソッド
async def semaphore_invoke(self, path, content):
# セマフォで並列処理数を制限
async with semaphore:
human_message = """
プログラム内で依存している(呼び出しを行っている)ローカルファイルがある場合、ツールを呼び出してください。
# プログラムが配置されているパス
{path}
(上記のパスは依存ファイルのパスではありません)
# プログラム(以下のプログラムから呼び出しを行っているファイルがあるか確認します)
{code}"""
prompt = ChatPromptTemplate.from_messages(
[
("human", human_message),
]
)
message_list = prompt.format_messages(
path=path,
code=content
)
response = await llm_tool.ainvoke(message_list)
return response
# SearchDependenceToolクラスをツールに設定
tools = [SearchDependenceTool()]
# llmインスタンスにツールをバインド
llm_tool = llm.bind_tools(tools)
# ツールノードにSearchDependenceToolクラスを設定
tool_node = ToolNode(tools)
async def main():
await compile_graph.ainvoke(
{
"parallel_num": parallel_num,
"convert_language": convert_language,
"read_dir_path": read_dir_path,
"input_ext_list": input_ext_list,
"no_processe_list": no_processe_list,
"output_ext_dict": output_ext_dict,
"llm": llm,
"llm_tool": llm_tool
}
)
# 通常ノードで使用するState
class GraphState(TypedDict):
parallel_num: int # 並列処理数
convert_language: str # 変換先の言語
read_dir_path: str # 処理対象ファイルの格納パス
input_ext_list: list # 処理対象ファイルの拡張子
no_processe_list: list # 処理対象外のファイル名・フォルダ名
output_ext_dict: dict # 出力ファイルの拡張子マッピング
llm: BaseChatModel # llmインスタンス
llm_tool: BaseChatModel # ツールをバインドしたllmインスタンス
all_task_file_list: list[tuple[str, str]] # 処理を行う全ファイルリスト
task_file_list: list[tuple[str, str]] # 処理中のファイルリスト
create_document_dict: Annotated[dict[str, list[BaseMessage]], operator.or_] # 作成した全詳細設計書の辞書
convert_document_dict: Annotated[dict[str, str], operator.or_] # 言語変換した全詳細設計書の辞書
create_code_dict: Annotated[dict[str, tuple[str, str]], operator.or_] # 生成した全コードの辞書
task_complete: bool # 全タスクの完了判定
# 並列処理ノードで使用するState
class ParallelState(TypedDict):
convert_language: str # 変換先の言語
llm: BaseChatModel # llmインスタンス
llm_tool: BaseChatModel # ツールをバインドしたllmインスタンス
task_file: tuple[str, str] # 処理中のファイル
create_document: tuple[str, list[BaseMessage]] # 作成した詳細設計書
convert_document: tuple[str, str] # 言語変換した詳細設計書
create_code: tuple[str, str, str] # 生成したコード
# 処理対象となる全ファイルを取得するノード
def read_dir(state: GraphState):
print("__START__")
print("__read_dir__")
read_dir_path = state["read_dir_path"]
input_ext_list = state["input_ext_list"]
no_processe_list = state["no_processe_list"]
all_task_file_list = []
# フォルダを再帰的に探索
for dir_path, dir_names, file_name_list in os.walk(read_dir_path):
# 処理対象外のディレクトリを除外
for no_processe in no_processe_list:
if no_processe in dir_names:
dir_names.remove(no_processe)
for i, (file_name) in enumerate(file_name_list, 1):
# 処理対象外のファイルをスキップ
if file_name in no_processe_list:
continue
# 指定された拡張子以外のファイルをスキップ
_, ext = os.path.splitext(file_name)
if ext.lstrip(".") not in input_ext_list:
continue
# ファイルパスを生成
file_path = os.path.join(dir_path, file_name)
print(f"処理予定ファイル{i}: {file_path}")
# ファイルのエンコーディングを判定
with open(file_path, "rb") as file:
raw_data = file.read()
result = chardet.detect(raw_data)
encoding = result["encoding"] or "utf-8"
# 判定したエンコーディングでファイルを読み込み
with open(file_path, "r", encoding=encoding) as file:
code = file.read()
all_task_file_list.append((file_path, code))
return {"all_task_file_list": all_task_file_list}
# 処理対象ファイルを読み込むノード
def read_file(state: GraphState):
print("__read_file__")
all_task_file_list = state["all_task_file_list"]
parallel_num = state["parallel_num"]
# 並列処理数と残りファイル数比較して小さい方を取得
file_num = min(len(all_task_file_list), parallel_num)
task_file_list = all_task_file_list[:file_num]
# 残りの処理数を更新
all_task_file_list = all_task_file_list[file_num:]
return {"all_task_file_list": all_task_file_list, "task_file_list": task_file_list}
# 各ファイルをメッセージ生成の並列ノードに分配
def send_document_massege(state: GraphState):
task_file_list = state["task_file_list"]
return [
Send(
"parallel_document_massege",
{
"task_file": task_file,
}
)
for task_file in task_file_list
]
# llmに渡す詳細設計書作成メッセージを生成する並列ノード
def parallel_document_massege(state: ParallelState):
print("__parallel_document_massege__")
file_path, code = state["task_file"]
print(f"詳細設計書作成用メッセージの生成: {file_path}")
system_message = "あなたはプログラムから具体的な詳細設計書を作成する熟練のエンジニアです。"
human_message = """
# タスク
プログラムから詳細設計書を作成します。
# ルール
ロジックの省略はせずに具体的で完全な詳細設計書を作成してください。
# 指示
- ルールに従い、プログラムから詳細設計書を作成してください。詳細設計書以外は出力しないでください。
- マークダウン形式で記載してください。また、mermaidによるフローチャートを記載してください。
- 処理は実装するソースコードをそのまま記載するのではなく、わかりやすいように文章と疑似コードで表現してください。
- 誰が読んでも同じ処理を実装できるように明確な詳細設計書を作成してください。
- プログラムを実行した際に文言が出力される場合、同じ文言が出力できるように詳細設計書内に記載してください。
- プログラム内で依存している(呼び出しを行っている)ローカルファイルのプログラムがある場合は、プログラムが配置されているパスから絶対パスを正確に推測してツールに渡してください。
- 依存しているローカルファイルの中身を取り込んで**一つのプログラムとして完結、動作するように**詳細設計書を作成してください。
**ロジックの省略が行われていないか細部に至るまで段階的に確認して、欠けることなく全てのロジックを記述してください**
# プログラムが配置されているパス
{file_path}
(上記のパスは呼び出しているファイルのパスではありません。詳細設計書に変換するプログラムの絶対パスです。)
# プログラム
{code}"""
prompt = ChatPromptTemplate.from_messages(
[
("system", system_message),
("human", human_message),
]
)
message_list = prompt.format_messages(
file_path=file_path,
code=code
)
# ファイルパスをキーとしたメッセージの辞書を作成
create_document = {file_path: message_list}
# 詳細設計書の辞書を更新
return {"create_document_dict": create_document}
# メッセージの辞書を詳細設計書を作成する並列ノードに分配
def send_create_document(state: GraphState):
create_document_dict = state["create_document_dict"]
llm_tool = state["llm_tool"]
return [
Send(
"parallel_create_document",
{
"create_document": create_document,
"llm_tool": llm_tool
}
)
for create_document in create_document_dict.items()
]
# 詳細設計書を作成、またはツールの呼出し用メッセージを作成する並列ノード
async def parallel_create_document(state: ParallelState):
print("__parallel_create_document__")
file_path, message_list = state["create_document"]
print(f"詳細設計書の作成、またはツールの呼出し設定: {file_path}")
llm_tool = state["llm_tool"]
response = await llm_tool.ainvoke(message_list)
message_list.append(response)
# ファイルパスをキーとしたメッセージの辞書を作成
create_document = {file_path: message_list}
# 詳細設計書の辞書を更新
return {"create_document_dict": create_document}
# メッセージの辞書をツールの呼び出しを行う並列ノード、または詳細設計書を作成する並列ノードに分配
def send_tool_or_convert(state: GraphState):
convert_language = state["convert_language"]
create_document_dict = state["create_document_dict"]
llm = state["llm"]
result = []
for create_document in create_document_dict.items():
if create_document[-1][-1].tool_calls:
# ツール呼び出し設定が含まれている場合
result.append(
Send(
"parallel_search_dependence_tool",
{
"create_document": create_document,
}
)
)
else:
# ツール呼び出し設定が含まれていない場合
result.append(
Send(
"parallel_convert_document",
{
"convert_language": convert_language,
"create_document": create_document,
"llm": llm
}
)
)
return result
# 依存ファイルの検索ツールを呼び出す並列ノード
async def parallel_search_dependence_tool(state: ParallelState):
print("__parallel_search_dependence_tool__")
file_path, message_list = state["create_document"]
print(f"ローカル依存ファイルの探索: {file_path}")
# ツールを呼び出して依存関係の検索を実行
response = await tool_node.ainvoke({"messages": message_list})
message_list.append(response["messages"][0])
# ファイルパスをキーとしたメッセージの辞書を作成
create_document = {file_path: message_list}
# 詳細設計書の辞書を更新
return {"create_document_dict": create_document}
# メッセージの辞書を詳細設計書を変換する並列ノードに分配
def send_convert_document(state: GraphState):
convert_language = state["convert_language"]
create_document_dict = state["create_document_dict"]
llm = state["llm"]
return [
Send(
"parallel_convert_document",
{
"convert_language": convert_language,
"create_document": create_document,
"llm": llm
}
)
for create_document in create_document_dict.items()
]
# 詳細設計書を指定された言語用に変換する並列ノード
async def parallel_convert_document(state: ParallelState):
print("__parallel_convert_document__")
convert_language = state["convert_language"]
file_path, message_list = state["create_document"]
print(f"{convert_language}用に詳細設計書を変換: {file_path}")
document = message_list[-1].content
llm = state["llm"]
system_message = "あなたは詳細設計書を**正確に**{convert_language}言語の詳細設計書に変換する熟練のエンジニアです。"
human_message = """
# タスク
詳細設計書を**正確**に{convert_language}の言語仕様に沿った詳細設計書に変換します。
# ルール
ロジックの省略はせずに具体的で完全な詳細設計書を作成してください。
# 指示
- ルールに従い、詳細設計書を**正確**に{convert_language}の言語仕様に沿った詳細設計書に変換してください。詳細設計書以外は出力しないでください。
- マークダウン形式で記載してください。また、mermaidによるフローチャートを記載してください。
- 処理は実装するソースコードをそのまま記載するのではなく、わかりやすいように文章と疑似コードで表現してください。
- 誰が読んでも同じ処理を実装できるように明確な詳細設計書を作成してください。
- ローカル依存ファイルの呼び出しはせずに**一ファイルのプログラムとして完結、動作するように**詳細設計書を作成してください。
**ロジックの省略が行われていないか細部に至るまで段階的に確認して、欠けることなく全てのロジックを記述してください**
# 詳細設計書
{document}
# {convert_language}の詳細設計書"""
prompt = ChatPromptTemplate.from_messages(
[
("system", system_message),
("human", human_message),
]
)
message_list = prompt.format_messages(
convert_language=convert_language,
document=document
)
response = await llm.ainvoke(message_list)
output_convert_document = response.content
# ファイルパスをキーとした変換した詳細設計書の辞書を作成
convert_document = {file_path: output_convert_document}
# 変換した詳細設計書の辞書を更新
return {"convert_document_dict": convert_document}
# 変換した詳細設計書の辞書をコードを生成する並列ノードに分配
def send_create_code(state: GraphState):
llm = state["llm"]
convert_document_dict = state["convert_document_dict"]
return [
Send(
"parallel_create_code",
{
"convert_document": convert_document,
"llm": llm
}
)
for convert_document in convert_document_dict.items()
]
# コードを生成する並列処理ノード
async def parallel_create_code(state: ParallelState):
print("__parallel_create_code__")
file_path, convert_document = state["convert_document"]
llm = state["llm"]
print(f"コードを生成: {file_path}")
system_message = "あなたは詳細設計書から**本番環境で動作する**プログラムを実装する熟練のエンジニアです。"
human_message = """
# タスク
詳細設計書からプログラムの生成を行います。
# ルール
ロジックの省略はせずに具体的で完全なプログラムを作成してください。特にGetterとSetterに関しては省略しないように注意してください。
# 指示
- ルールに従い、詳細設計書からプログラムを生成してください。
- 説明や補足、プログラム内のコメントは出力しないでください。コードブロックで囲まれたプログラムのみ出力してください。
- プログラムは分割せずに一ファイルで出力してください。
- ローカル依存ファイルの呼び出しはせずに**一ファイルのプログラムとして完結、動作するように**生成してください。
**ロジックの省略が行われていないか細部に至るまで段階的に確認して、欠けることなく全てのロジックを実装してください。**
**プログラムを貼り付けただけで動作する状態にしてください。**
# 詳細設計書
{convert_document}
# プログラム"""
prompt = ChatPromptTemplate.from_messages(
[
("system", system_message),
("human", human_message),
]
)
message_list = prompt.format_messages(
convert_document=convert_document
)
response = await llm.ainvoke(message_list)
output_code = response.content
# ファイルパスをキーとした(変換済の)詳細設計書とコードの辞書を作成
create_code = {file_path: (convert_document, output_code)}
# (変換済の)詳細設計書の辞書を更新
return {"create_code_dict": create_code}
# 処理対象ファイルのタスク完了を判定するノード
def check_task_completion(state: GraphState):
create_code_dict = state["create_code_dict"]
task_file_list = state["task_file_list"]
# 完了したタスク(ファイルパス)をセットとして取得
complete_task_name_set = set(create_code_dict)
# 処理対象のタスク(ファイルパス)をセットとして取得
task_name_set = {task_file[0] for task_file in task_file_list}
# 完了したタスクに処理対象のタスクが全て含まれている場合True
task_complete = task_name_set.issubset(complete_task_name_set)
return {"task_complete": task_complete}
# 次の処理対象ファイルを読み込むノード、またはバックアップ処理ノードに進むかを判断するエッジ
def read_or_end(state: GraphState):
all_task_file_list = state["all_task_file_list"]
task_complete = state["task_complete"]
# 処理対象のタスクが完了していない場合はスキップ
if not task_complete:
return "skip"
# 処理対象ファイルがまだ残っている場合は次の処理対象ファイルを読み込むノードへ
if len(all_task_file_list) != 0:
return "read_file"
# 全ての処理が完了した場合はバックアップ処理ノードへ
else:
return "move_bac"
# 元ののファイルをバックアップするノード
def move_bac(state: GraphState):
print("__move_bac__")
read_dir_path = state["read_dir_path"]
bac_dir = os.path.join(read_dir_path, "bac")
print(f"ファイルをバックアップフォルダに移動\n{read_dir_path} -> {bac_dir}")
# バックアップディレクトリの作成
os.makedirs(bac_dir, exist_ok=True)
# 指定ディレクトリ内の全ファイル・フォルダを処理
for name in os.listdir(read_dir_path):
# バックアップディレクトリは処理しない
if name == "bac":
continue
file_path = os.path.join(read_dir_path, name)
dest_path = os.path.join(bac_dir, name)
# ディレクトリの場合の処理
if os.path.isdir(file_path):
# 移動先に同名のディレクトリが存在する場合は削除
if os.path.exists(dest_path):
shutil.rmtree(dest_path)
# バックアップディレクトリにコピーして元のディレクトリを削除
shutil.copytree(file_path, dest_path)
shutil.rmtree(file_path)
# ファイルの場合の処理
else:
# バックアップディレクトリに移動
shutil.move(file_path, dest_path)
# 変換された詳細設計書とコードを保存するノード
def save_files(state: GraphState):
print("__save_files__")
create_code_dict = state["create_code_dict"]
output_ext_dict = state["output_ext_dict"]
# (変換済の)詳細設計書とコードの辞書を処理
for file_path, (document, code) in create_code_dict.items():
# 保存先ディレクトリのパスを作成
save_dir = os.path.dirname(file_path)
os.makedirs(save_dir, exist_ok=True)
# ファイル名から拡張子を除いた名前を取得
name = os.path.splitext(os.path.basename(file_path))[0]
# 詳細設計書をMarkdownファイルとして保存
save_document_path = os.path.join(save_dir, name+"_document.md")
print(f"詳細設計書を保存: {save_document_path}")
with open(save_document_path, "w", encoding="utf-8") as file:
file.write(document)
# コードブロックを抽出するための正規表現パターン
pattern = r"```(\w+)\n(.*?)\n?\s*```"
# コードブロックの言語識別子とコードを抽出
code_block_list = re.findall(pattern, code, re.DOTALL)
# 抽出された言語識別子を拡張子に変換してコードを保存(コードブロックが二つ以上の場合は連番を付けて保存)
for i, (lang, code) in enumerate(code_block_list):
ext = output_ext_dict.get(lang, lang)
if i == 0:
save_code_path = os.path.join(save_dir, f"{name}.{ext}")
else:
save_code_path = os.path.join(save_dir, f"{name}_other_{i + 1}.{ext}")
print(f"コードを保存: {save_document_path}")
with open(save_code_path, "w") as file:
file.write(code.strip())
print("__end__")
# グラフの初期化
graph = StateGraph(GraphState)
# グラフのワークフローを構成
graph.set_entry_point("read_dir")
graph.add_node("read_dir", read_dir)
graph.add_edge("read_dir", "read_file")
graph.add_node("read_file", read_file)
graph.add_conditional_edges(
"read_file",
send_document_massege,
[
"parallel_document_massege"
]
)
graph.add_node("parallel_document_massege", parallel_document_massege)
graph.add_conditional_edges(
"parallel_document_massege",
send_create_document,
[
"parallel_create_document"
]
)
graph.add_node("parallel_create_document", parallel_create_document)
graph.add_conditional_edges(
"parallel_create_document",
send_tool_or_convert,
[
"parallel_search_dependence_tool",
"parallel_convert_document"
]
)
graph.add_node("parallel_search_dependence_tool", parallel_search_dependence_tool)
graph.add_conditional_edges(
"parallel_search_dependence_tool",
send_create_document,
[
"parallel_create_document"
]
)
graph.add_node("parallel_convert_document", parallel_convert_document)
graph.add_conditional_edges(
"parallel_convert_document",
send_create_code,
[
"parallel_create_code"
]
)
graph.add_node("parallel_create_code", parallel_create_code)
graph.add_edge("parallel_create_code", "check_task_completion")
graph.add_node("check_task_completion", check_task_completion)
graph.add_conditional_edges(
"check_task_completion",
read_or_end,
{
"skip": END,
"read_file": "read_file",
"move_bac": "move_bac"
}
)
graph.add_node("move_bac", move_bac)
graph.add_edge("move_bac", "save_files")
graph.add_node("save_files", save_files)
graph.set_finish_point("save_files")
# グラフのコンパイル
compile_graph = graph.compile()
# ワークフロー図をMermaid形式で保存
Image(compile_graph.get_graph().draw_mermaid_png(output_file_path=save_graph_img_path))
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main())
processing_time = time.time() - start_time
hours, remainder = divmod(processing_time, 3600)
minutes, seconds = divmod(remainder, 60)
# 処理時間を出力
print(f"\n処理時間: {hours:02.0f}h:{minutes:02.0f}m:{seconds:04.1f}s")
上記コード内の以下について説明します。
- パラメータの設定
- ツールの動作
- 並列ノード(Send)について
パラメータの設定
# llmのAPIキー
api_key=""
# 並列処理数
parallel_num = 3
# 変換先の言語
convert_language = "python"
# 処理対象ファイルの格納パス
read_dir_path = "./code_storage"
# 処理対象ファイルの拡張子
input_ext_list = ["go", "pro"]
# 処理対象外のファイル名・フォルダ名を設定
no_processe_list = ["bac", "example", "game", "board"]
# 出力ファイルの拡張子マッピング
output_ext_dict = {"python": "py"}
# LangGraphのワークフロー図を保存するパス
save_graph_img_path = "./graph_image.png"
# llmインスタンス
llm = ChatAnthropic(
model_name="claude-3-5-sonnet-latest",
max_tokens_to_sample=8192,
api_key=api_key
)
# 依存ファイルの検索を行う際の深さ制限
max_depth = 5
# 依存ファイル検索の並列処理数を制限
semaphore = asyncio.Semaphore(parallel_num)
上記がパラメータ部分です。基本的にはコード全体をコピペしてここを設定すれば動(くはず・・・)きます。
今回は実行しやすくするため(apiキー含めて)ハードコーディングしていますが、本来は環境変数などを利用した方がよいかと思います。
以下がパラメータの内容です。
-
api_key: llmのAPIキー
こちらに取得しているapiキーを入力してください。自分はanthropicを使用しましたが、他のllmを使用する際にはここと「llm」のパラメータを書き換えてください。 -
parallel_num: 並列処理数
同時にいくつのファイルを同時に処理するかをここで設定します。 -
convert_language: 変換先の言語
処理対象のプログラム群をここで設定した言語に一律で変換します。 -
read_dir_path: 処理対象ファイルの格納パス
処理対象のファイル(プログラム)群が格納されているフォルダ(複数、またはネストも可)を指定してください。 -
input_ext_list: 処理対象ファイルの拡張子
処理対象となるファイルをここで設定した拡張子で絞ります(フィルター)。
ツール内における依存ファイルの集約対象には、ここで設定した値は適用されません。- 例) ファイル1 → 依存ファイル1 → 依存ファイル2
上記、ファイル1のみにinput_ext_listを適用
- 例) ファイル1 → 依存ファイル1 → 依存ファイル2
-
no_processe_list: 処理対象外のファイル名・フォルダ名を設定
処理対象から除外するフォルダ・ファイル名を設定します。
「input_ext_list」同様、ツール内における依存ファイルの集約対象には、ここで設定した値は適用されません。。 -
output_ext_dict: 出力ファイルの拡張子マッピング
最終的なコードの保存に使用します。
コードをファイルに保存する際はllmの出力からコードブロックを抽出して保存します。その際にコードブロックの言語識別子を保存ファイルの拡張子に利用します。javaなど言語識別子と拡張子が同じ場合は設定する必要はありませんが、例えばpythonの場合は言語識別子が「python」、拡張子が「py」になるため設定する必要があります(設定していない場合、言語識別子がそのまま拡張子になります)。 -
save_graph_img_path: LangGraphのワークフロー図を保存するパス
ワークフロー図を保存する際のパスを設定します
必要ない場合はコード内の以下の部分と一緒に削除してください。parallel_program_converter.py# ワークフロー図をMermaid形式で保存 Image(compile_graph.get_graph().draw_mermaid_png(output_file_path=save_graph_img_path))
-
llm: llmインスタンス
llmのパラメータを設定してインスタンスを取得します。
他のllmを使用する際はここと「api_key」を変更してください。 -
max_depth: 依存ファイルの検索を行う際の深さ制限
ツール内で依存ファイルの検索を行う際の深さの制限(最大ネスト数)を設定します。- 例)ファイル内でインポートしたファイル内でさらにファイルをインポートしていた。
上記の場合、深さは"2"になります。
依存ファイルの検索はファイル内のコードからllmを使用して行います(一つのファイル内から依存ファイルを探すのにllmへのAPIリクエストを一度実行します)。
インポートが複数に枝分かれした場合、検索するファイルも増大するので注意してください。 -
semaphore: 依存ファイル検索の並列処理数制限
ツール内で依存ファイル検索を行う際の並列処理数をセマフォで制限します(デフォルトで「parallel_num」の値を渡しています)。
依存ファイルの検索が複数に分岐した場合、並列処理数も増えます。制限がない場合は依存ファイル検索の並列処理数(llmへのAPI同時リクエスト数)も増大してしまうため、セマフォで並列処理数に制限を設けました。
ツールの動作
# 依存ファイルを検索するツール
class SearchDependenceTool(BaseTool):
name: str = "search_dependence_tool"
description: str = """
対象のプログラムの中で依存している(呼び出しを行っている)ローカルファイルのプログラムがある場合、このツールにそのローカルファイルの絶対パスを与えます(複数可)。
このツールは与えられた絶対パスからファイルの中身を返します。
Args:
file_path_list (List[str]): 依存しているローカルファイルのパスのリスト
Returns:
str: 以下のフォーマットで整形されたファイル内容を含む文字列を返します。
File: [ファイルパス]
```
[ファイル内容]
```
エラーが発生した場合は、エラーメッセージが内容として含まれます。"""
# 同期処理メソッド(非対応)
def _run(self, file_path_list: list[str]):
raise NotImplementedError("同期処理には非対応")
# 非同期処理メソッド
async def _arun(self, file_path_list: list[str]):
# 依存ファイルのパスと中身を格納する文字列
result = ""
# 依存ファイル内のパスリストと深さを格納するキュー
task_queue = asyncio.Queue()
# 処理済みのファイルパスを格納するセット
processed_file_path_set = set()
# キューに初期値を設定
await task_queue.put((file_path_list, 1))
# キューが空になるまで処理を継続
while not task_queue.empty():
# キューからファイルパスリストと深さを取得
file_path_list, current_depth = await task_queue.get()
for path in file_path_list:
# すでに処理済みのファイルはスキップ
if path in processed_file_path_set:
continue
# 処理済みのパスとして格納
processed_file_path_set.add(path)
# ファイルのパスと中身をresultに追加
result += f"File: {path}\n"
result += "```" + "\n"
content = ""
try:
async with aiofiles.open(path, "r", encoding="utf-8") as f:
content = await f.read()
result += content.strip() + "\n"
except Exception as e:
result += f"Error reading file: {str(e)}\n"
result += "```" + "\n\n"
continue
result += "```" + "\n\n"
# 深さの制限に達したファイルは処理をスキップ
if current_depth >= max_depth:
continue
# semaphore_invokeメソッドを呼び出して依存ファイルの検索を実行
response = await self.semaphore_invoke(path, content)
# 依存ファイルが確認された場合、キューにファイルのパスを追加
if response.tool_calls:
file_path_list = response.tool_calls[0]["args"]["file_path_list"]
await task_queue.put((file_path_list, current_depth+1))
return result
# 依存ファイル検索するメソッド
async def semaphore_invoke(self, path, content):
# セマフォで並列処理数を制限
async with semaphore:
human_message = """
プログラム内で依存している(呼び出しを行っている)ローカルファイルがある場合、ツールを呼び出してください。
# プログラムが配置されているパス
{path}
(上記のパスは依存ファイルのパスではありません)
# プログラム(以下のプログラムから呼び出しを行っているファイルがあるか確認します)
{code}"""
prompt = ChatPromptTemplate.from_messages(
[
("human", human_message),
]
)
message_list = prompt.format_messages(
path=path,
code=content
)
response = await llm_tool.ainvoke(message_list)
return response
ツールの動作(処理手順)は以下のようになっています。
-
ツールは「parallel_search_dependence_tool」ノードから並列で呼び出されます。
引数は依存ファイルのパスリスト(ファイル内に複数の依存ファイルがあることを考慮してリスト)です。 -
以下の変数を定義します。
-
result: 依存ファイルのパスと中身を格納する文字列
依存ファイルが見つかるとこの変数にそのファイルパスと中身が入れられます。
全ての検索が終了するとこの変数が返されます。 -
task_queue: 依存ファイル内のパスリストと深さを格納するキュー
検索したファイル内で依存するファイルがあると確認された場合、そのファイルパスリストと深さ(ツールに渡したファイルパスのリストを"1"としたネスト数)を処理予定のタスクとしてこの変数に格納します。 -
processed_file_path_set: 処理済みのファイルパスを格納するセット
複数のファイルから同一のファイルが呼び出されている場合、「task_queue」に同じファイルパスが入ってしまいます。
同じファイルは二回以上処理する必要がないため、この変数に処理済みのファイルパスを入れて「すでに処理済みのファイルかどうか」を判断するようにしています。
-
-
ループで「task_queue」からファイルパスを一つずつ取得して「result」にファイルパスとその中身を追加します。追加したファイル内に依存ファイルがあった場合(llmを使い検索)、「task_queue」にその依存ファイルのパスリストと深さを追加します。
-
「task_queue」にあるすべてのファイルパスを処理後、「result」を返します。
並列ノード(Send)について
通常ノードの場合、ワークフロー内を事前に定義した個数で処理していきます。
ただ今回の場合、同時に処理させたいファイル(一つのファイルあたり一つのノードで処理)の数をパラメータで設定したかったため、「Send」を使ってワークフローの中で動的に割り振ることにしました。
Sendは主に次のノード数が決まっておらず、ワークフロー内で動的に変化させたい場合に使う機能です。条件付きエッジ(add_conditional_edges)と組み合わせて使用します。
参考:
コード内では以下のように使っています。
例) 各ファイルをメッセージ生成の並列ノードに分配するSend
def send_document_massege(state: GraphState):
task_file_list = state["task_file_list"]
return [
Send(
"parallel_document_massege",
{
"task_file": task_file,
}
)
for task_file in task_file_list
]
上記、Sendに渡していている第一引数("parallel_document_massege")が次に処理する並列にしたいノード名です。第二引数は並列ノードに渡すデータ(今回の場合は処理対象のファイル)です。
リスト内包表記を使っているため「task_file_list」の個数分、Sendを用いて「parallel_document_massege」ノードに割り振られます。
そのため、「parallel_document_massege」ノードは「task_file_list」の個数分存在する並列ノードとなります。
実行準備(サンプルプログラム)
三つのサンプルプログラムを対象として動作を確認してみます。
(以下のサンプルプログラムに関してはllmで生成 + 手直しで作成しています。)
サンプルプログラム(一つ目)
プログラム
一つ目は、go言語で他ファイルの呼び出しと結果の表示を行うシンプルなプログラムです。
package main
import (
"fmt"
"sample_1/example"
)
func main() {
exampleMessage := example.GetMessage()
exampleData := example.GetData()
fmt.Println(exampleMessage)
fmt.Println(exampleData)
}
package example
func GetMessage() string {
return outputMessage()
}
func GetData() []int {
return outputData()
}
package example
func outputMessage() string {
return "massage_example"
}
package example
func outputData() []int {
return []int{1, 2, 3, 4, 5}
}
ディレクトリ構造
sample_1/
├── main.go
└── example/
├── example.go
├── massage_example.go
└── data_example.go
ファイル間の依存関係
出力
massage_example
[1 2 3 4 5]
サンプルプログラム(二つ目)
二つ目は、C++のQtにおける基本的な機能(ボタンやラベル、画面遷移など)を確認するプログラムです。
プログラム
#include <QApplication>
#include <QStackedWidget>
#include "MainWindow.h"
#include "SubWindow.h"
int main(int argc, char *argv[]) {
QApplication app(argc, argv);
// スタックウィジェット
QStackedWidget stackedWidget;
// メインウィンドウ
MainWindow *mainWindow = new MainWindow(&stackedWidget);
stackedWidget.addWidget(mainWindow);
// サブウィンドウ
SubWindow *subWindow = new SubWindow(&stackedWidget);
stackedWidget.addWidget(subWindow);
// メインウィンドウの設定
stackedWidget.setFixedSize(600, 400);
stackedWidget.show();
return app.exec();
}
#include "MainWindow.h"
MainWindow::MainWindow(QStackedWidget *stackedWidget, QWidget *parent)
: QWidget(parent), stackedWidget(stackedWidget) {
// ラベル
label = new QLabel("initial_label", this);
// 入力フィールド
inputField = new QLineEdit(this);
inputField->setPlaceholderText("input_name");
// ボタン
QPushButton *button = new QPushButton("button", this);
connect(button, &QPushButton::clicked, this, &MainWindow::outputMessage);
// ラジオボタン
radio1 = new QRadioButton("option_1", this);
radio2 = new QRadioButton("option_2", this);
radio1->setChecked(true);
// コンボボックス
comboBox = new QComboBox(this);
comboBox->addItems({"1", "2", "3"});
// 画面遷移ボタン
QPushButton *exampleButton = new QPushButton("sub_window", this);
connect(exampleButton, &QPushButton::clicked, this, &MainWindow::goToExample);
// レイアウト作成
QVBoxLayout *layout = new QVBoxLayout(this);
// 上部
layout->addWidget(label);
layout->addWidget(inputField);
layout->addWidget(button);
// 中央
QHBoxLayout *optionLayout = new QHBoxLayout();
optionLayout->addWidget(radio1);
optionLayout->addWidget(radio2);
layout->addLayout(optionLayout);
// 下部
layout->addWidget(comboBox);
layout->addWidget(exampleButton);
setLayout(layout);
}
void MainWindow::outputMessage() {
QString name = inputField->text();
QString message = "Hello, " + name + "! ";
message += radio1->isChecked() ? "(option_1)" : "(option_2)";
label->setText(message);
}
void MainWindow::goToExample() {
stackedWidget->setCurrentIndex(1);
}
#ifndef MAINWINDOW_H
#define MAINWINDOW_H
#include <QWidget>
#include <QLabel>
#include <QLineEdit>
#include <QPushButton>
#include <QCheckBox>
#include <QRadioButton>
#include <QComboBox>
#include <QVBoxLayout>
#include <QHBoxLayout>
#include <QStackedWidget>
class MainWindow : public QWidget {
Q_OBJECT
public:
explicit MainWindow(QStackedWidget *stackedWidget, QWidget *parent = nullptr);
private slots:
void outputMessage();
void goToExample();
private:
QStackedWidget *stackedWidget;
QLabel *label;
QLineEdit *inputField;
QCheckBox *checkbox;
QRadioButton *radio1;
QRadioButton *radio2;
QComboBox *comboBox;
};
#endif
QT += widgets
TARGET = sample_2
TEMPLATE = app
SOURCES += main.cpp \
MainWindow.cpp \
SubWindow.cpp
HEADERS += MainWindow.h \
SubWindow.h
#include "SubWindow.h"
SubWindow::SubWindow(QStackedWidget *stackedWidget, QWidget *parent)
: QWidget(parent), stackedWidget(stackedWidget) {
// ラベル
QLabel *label = new QLabel("sub_window", this);
// exampleメッセージ用ラベル
localMessageLabel = new QLabel("initial_label", this);
// 戻るボタン
QPushButton *backButton = new QPushButton("back", this);
connect(backButton, &QPushButton::clicked, this, &SubWindow::goToMain);
// 更新ボタン
QPushButton *updateButton = new QPushButton("button", this);
connect(updateButton, &QPushButton::clicked, this, &SubWindow::updateExampleMessage);
// レイアウト
QVBoxLayout *layout = new QVBoxLayout(this);
layout->addWidget(label);
layout->addWidget(localMessageLabel);
layout->addWidget(updateButton);
layout->addWidget(backButton);
setLayout(layout);
}
void SubWindow::updateExampleMessage() {
QString message = getExampleMessage();
localMessageLabel->setText(message);
}
void SubWindow::goToMain() {
stackedWidget->setCurrentIndex(0);
}
QString SubWindow::getExampleMessage() {
return "button_pressed";
}
#ifndef SUBWINDOW_H
#define SUBWINDOW_H
#include <QWidget>
#include <QLabel>
#include <QPushButton>
#include <QVBoxLayout>
#include <QStackedWidget>
class SubWindow : public QWidget {
Q_OBJECT
public:
explicit SubWindow(QStackedWidget *stackedWidget, QWidget *parent = nullptr);
private slots:
void updateExampleMessage();
void goToMain();
private:
QStackedWidget *stackedWidget;
QLabel *localMessageLabel;
QString getExampleMessage();
};
#endif
ディレクトリ構造
sample_2/
├── sample_2.pro
├── main.cpp
├── MainWindow.h
├── MainWindow.cpp
├── SubWindow.h
└── SubWindow.cpp
ファイル間の依存関係
出力
-
メイン画面
「sub_window」を押下するとサブ画面に遷移します。 -
サブ画面
サンプルプログラム(三つ目)
三つ目は、go言語のライフゲームです。初期配置の状態(グライダー)からパラメータ(main.go内の「maxGenerations」)で設定した世代数までシミュレーションするプログラムです。
プログラム
package main
import (
"fmt"
"time"
"strings"
"sample_3/game"
)
func main() {
maxGenerations := 3 // 最大世代数を設定
game := game.NewGame(20, 20)
// 初期状態の設定(グライダー)
game.SetCell(1, 0, true)
game.SetCell(2, 1, true)
game.SetCell(0, 2, true)
game.SetCell(1, 2, true)
game.SetCell(2, 2, true)
for generation := 1; generation <= maxGenerations; generation++ {
// 現在の状態を表示
fmt.Printf("Generation: %d / %d\n", generation, maxGenerations)
fmt.Println(strings.Repeat("-", 41))
game.Print()
fmt.Println(strings.Repeat("-", 41))
if generation < maxGenerations {
game.Next()
time.Sleep(200 * time.Millisecond)
}
}
fmt.Println("Simulation completed!")
}
package game
import (
"fmt"
"sample_3/board"
)
type Game struct {
board *board.Board
}
func NewGame(width, height int) *Game {
return &Game{
board: board.NewBoard(width, height),
}
}
func (g *Game) countNeighbors(x, y int) int {
count := 0
for dy := -1; dy <= 1; dy++ {
for dx := -1; dx <= 1; dx++ {
if dx == 0 && dy == 0 {
continue
}
newX := (x + dx + g.board.Width) % g.board.Width
newY := (y + dy + g.board.Height) % g.board.Height
if g.board.Get(newX, newY) {
count++
}
}
}
return count
}
func (g *Game) Next() {
newBoard := board.NewBoard(g.board.Width, g.board.Height)
// 各セルの次世代の状態を計算
for y := 0; y < g.board.Height; y++ {
for x := 0; x < g.board.Width; x++ {
neighbors := g.countNeighbors(x, y)
alive := g.board.Get(x, y)
// ライフゲームのルールを適用
if alive && (neighbors == 2 || neighbors == 3) {
newBoard.Set(x, y, true)
} else if !alive && neighbors == 3 {
newBoard.Set(x, y, true)
}
}
}
g.board = newBoard
}
// セルの状態を設定するメソッド
func (g *Game) SetCell(x, y int, state bool) {
g.board.Set(x, y, state)
}
// ボードの状態を表示するメソッド
func (g *Game) Print() {
// 列番号の表示
fmt.Print(" ")
for x := 0; x < g.board.Width; x++ {
fmt.Printf("%-2d", x)
}
fmt.Println()
// 実際にセルが存在する行だけを表示するための範囲を計算
minY, maxY := g.getActiveBounds()
// 少なくとも5行は表示する
maxY = max(maxY, minY+4)
for y := minY; y <= maxY; y++ {
fmt.Printf("%3d ", y)
for x := 0; x < g.board.Width; x++ {
if g.board.Get(x, y) {
fmt.Print("■ ")
} else {
fmt.Print("□ ")
}
}
fmt.Println()
}
}
// アクティブなセルが存在する範囲を計算する補助関数
func (g *Game) getActiveBounds() (minY, maxY int) {
minY = g.board.Height
maxY = 0
for y := 0; y < g.board.Height; y++ {
for x := 0; x < g.board.Width; x++ {
if g.board.Get(x, y) {
if y < minY {
minY = y
}
if y > maxY {
maxY = y
}
}
}
}
// アクティブなセルが見つからない場合
if minY > maxY {
minY = 0
maxY = 4 // デフォルトで5行表示
}
// 表示範囲の前後に1行ずつ余白を追加
minY = max(0, minY-1)
maxY = min(g.board.Height-1, maxY+1)
return minY, maxY
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
package board
type Board struct {
cells [][]bool
Width, Height int
}
func NewBoard(width, height int) *Board {
cells := make([][]bool, height)
for i := range cells {
cells[i] = make([]bool, width)
}
return &Board{
cells: cells,
Width: width,
Height: height,
}
}
func (b *Board) Get(x, y int) bool {
return b.cells[y][x]
}
func (b *Board) Set(x, y int, state bool) {
b.cells[y][x] = state
}
ディレクトリ構造
sample_3/
├── main.go
├── game/
│ └── game.go
└── board/
└── board.go
ファイル間の依存関係
出力
Generation: 1 / 3
-----------------------------------------
0 1 2 3 4 5 6 7 8 9 10111213141516171819
0 □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
1 □ □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
2 ■ ■ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
3 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
4 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
-----------------------------------------
Generation: 2 / 3
-----------------------------------------
0 1 2 3 4 5 6 7 8 9 10111213141516171819
0 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
1 ■ □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
2 □ ■ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
3 □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
4 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
-----------------------------------------
Generation: 3 / 3
-----------------------------------------
0 1 2 3 4 5 6 7 8 9 10111213141516171819
0 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
1 □ □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
2 ■ □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
3 □ ■ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
4 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
-----------------------------------------
Simulation completed!
サンプルプログラムの配置
作成したサンプルプログラムを"./code_storage"の下に配置します。
code_storage/
├── sample_1/...
├── sample_2/...
└── sample_3/...
実行
パラメータに関しては、変更せずに実行しました(「api_key」は設定)。
パラメータを変更していない限り「sample_1」、「sample_2」、「sample_3」のプログラムがそれぞれ一つのpythonプログラムに集約、変換されます(python以外の言語に変換したい場合は「convert_language」を変更してください)。
実行結果
実行結果として「詳細設計書」と「依存ファイルの集約 & 言語変換されたプログラム」が生成されます。
サンプルプログラム(一つ目)
詳細設計書
詳細設計書
Python詳細設計書
1. 概要
標準出力にメッセージと数値データを表示するシンプルなアプリケーションです。
2. 構成
2.1 モジュール構成
single_file_program.py # 全ての機能を含む単一ファイル
2.2 依存関係
- 標準ライブラリのみを使用
3. 処理フロー
4. 詳細仕様
4.1 クラス構造
Example クラス
内部にMessageHandlerとDataHandlerの2つの内部クラスを持つ
MessageHandler クラス(Exampleの内部クラス)
メッセージ処理を担当
DataHandler クラス(Exampleの内部クラス)
データ処理を担当
4.2 メイン処理
入力
- なし
処理内容
- Exampleクラスのインスタンスを生成
- MessageHandlerインスタンスを取得してメッセージを取得・表示
- DataHandlerインスタンスを取得してデータを取得・表示
出力
- 標準出力:取得したメッセージと数値データ
疑似コード
if __name__ == "__main__":
# Exampleクラスのインスタンス生成
example = Example()
# メッセージ処理
message_handler = example.MessageHandler()
message = message_handler.get_message()
print(message)
# データ処理
data_handler = example.DataHandler()
data = data_handler.get_data()
print(data)
4.3 Example クラス
属性
- なし
メソッド
- なし(内部クラスのコンテナとしての役割のみ)
疑似コード
class Example:
class MessageHandler:
# MessageHandler実装
pass
class DataHandler:
# DataHandler実装
pass
4.4 MessageHandler クラス
属性
- _message: str(プライベート属性)
メソッド
get_message
- 入力:なし
- 処理:メッセージ文字列を生成
- 出力:str型のメッセージ
疑似コード
class MessageHandler:
def __init__(self):
self._message = "Example Message"
def get_message(self) -> str:
return self._message
4.5 DataHandler クラス
属性
- _data: list[int](プライベート属性)
メソッド
get_data
- 入力:なし
- 処理:整数リストを生成
- 出力:整数のリスト
疑似コード
class DataHandler:
def __init__(self):
self._data = [1, 2, 3, 4, 5]
def get_data(self) -> list[int]:
return self._data
5. エラー処理
このプログラムには明示的なエラー処理は実装されていません。
6. 注意事項
- すべての処理は単一ファイル内で完結します。
- クラス構造を用いて、元のGoプログラムの機能を再現します。
- 内部クラスを使用してパッケージ的な構造を模倣します。
7. 制限事項
- 並行処理は行われていません。
- ファイル操作や外部APIとの通信は行われていません。
- コマンドライン引数は受け付けていません。
8. 動作環境
- Python 3.7以上
- 追加のパッケージのインストールは不要
9. 型ヒント
- Python 3.7以上の型ヒントを使用
- list[int]、strなどの型を明示的に指定
生成されたプログラム
class Example:
class MessageHandler:
def __init__(self):
self._message = "Example Message"
def get_message(self) -> str:
return self._message
class DataHandler:
def __init__(self):
self._data = [1, 2, 3, 4, 5]
def get_data(self) -> list[int]:
return self._data
if __name__ == "__main__":
example = Example()
message_handler = example.MessageHandler()
message = message_handler.get_message()
print(message)
data_handler = example.DataHandler()
data = data_handler.get_data()
print(data)
生成されたプログラムの実行結果
Example Message
[1, 2, 3, 4, 5]
もとのメッセージが「massage_example」だったのが「Example Message」に変わっています。
サンプルプログラム(二つ目)
詳細設計書
詳細設計書
Python Qt アプリケーション詳細設計書
1. モジュール構成
必要なモジュールのインポート:
- PyQt6.QtWidgets
- PyQt6.QtCore
- sys
2. クラス構成
2.1 MainWindow クラス
class MainWindow(QWidget):
プロパティ:
- label: QLabel型
- input_field: QLineEdit型
- button: QPushButton型
- radio_group: QButtonGroup型
- radio1: QRadioButton型
- radio2: QRadioButton型
- combo_box: QComboBox型
- sub_window_button: QPushButton型
- stacked_widget: QStackedWidget型 (外部参照)
2.2 SubWindow クラス
class SubWindow(QWidget):
プロパティ:
- title_label: QLabel型
- message_label: QLabel型
- update_button: QPushButton型
- back_button: QPushButton型
- stacked_widget: QStackedWidget型 (外部参照)
3. 詳細仕様
3.1 メインアプリケーション初期化
処理内容:
1. QApplication インスタンスを生成
2. QStackedWidget インスタンスを生成
3. ウィンドウサイズを設定 (600x400)
4. MainWindow インスタンスを生成し、スタックに追加
5. SubWindow インスタンスを生成し、スタックに追加
6. アプリケーションを表示
7. イベントループを開始
例外処理:
- システムエラー発生時はアプリケーションを終了
3.2 MainWindow クラスの実装
3.2.1 初期化処理
処理内容:
1. QWidget の初期化
2. stacked_widget の参照を保持
3. UIコンポーネントの生成
- QLabel ("initial_label")
- QLineEdit (placeholder="input_name")
- QPushButton ("button")
- QRadioButton x 2 ("option_1", "option_2")
- QComboBox (items=["1", "2", "3"])
- QPushButton ("sub_window")
4. レイアウトの設定
- QVBoxLayout を使用
- 各コンポーネントを垂直方向に配置
5. シグナル/スロット接続
- button.clicked → output_message
- sub_window_button.clicked → go_to_sub_window
3.2.2 メッセージ出力処理
function output_message():
処理内容:
1. 入力フィールドからテキストを取得
2. メッセージを生成
base_message = f"Hello, {name}! "
if radio1.isChecked():
message = base_message + "(option_1)"
else:
message = base_message + "(option_2)"
3. ラベルのテキストを更新
3.2.3 画面遷移処理
function go_to_sub_window():
処理内容:
1. stacked_widget.setCurrentIndex(1)
3.3 SubWindow クラスの実装
3.3.1 初期化処理
処理内容:
1. QWidget の初期化
2. stacked_widget の参照を保持
3. UIコンポーネントの生成
- QLabel ("sub_window")
- QLabel ("initial_label")
- QPushButton ("button")
- QPushButton ("back")
4. レイアウトの設定
- QVBoxLayout を使用
- 各コンポーネントを垂直方向に配置
5. シグナル/スロット接続
- update_button.clicked → update_message
- back_button.clicked → go_to_main
3.3.2 メッセージ更新処理
function update_message():
処理内容:
1. message_label.setText("button_pressed")
3.3.3 画面遷移処理
function go_to_main():
処理内容:
1. stacked_widget.setCurrentIndex(0)
4. レイアウト仕様
4.1 MainWindow レイアウト
レイアウト構造:
QVBoxLayout
├── QLabel (initial_label)
├── QLineEdit (input_name)
├── QPushButton (button)
├── QHBoxLayout
│ ├── QRadioButton (option_1)
│ └── QRadioButton (option_2)
├── QComboBox
└── QPushButton (sub_window)
4.2 SubWindow レイアウト
レイアウト構造:
QVBoxLayout
├── QLabel (sub_window)
├── QLabel (initial_label)
├── QPushButton (button)
└── QPushButton (back)
5. イベントフロー
6. エラー処理
エラー処理方針:
1. アプリケーション初期化エラー
- sys.exit(1) でアプリケーションを終了
2. GUI操作エラー
- 特別な処理は実装せず、デフォルトのQt動作に従う
7. メモリ管理
メモリ管理方針:
1. ウィジェットの親子関係
- MainWindow, SubWindow は QStackedWidget の子として管理
- 各UIコンポーネントは対応するウィンドウの子として管理
2. 参照管理
- stacked_widget への参照は各ウィンドウクラスで保持
- 他のウィジェットはインスタンス変数として保持
生成されたプログラム
from PyQt6.QtWidgets import QApplication, QWidget, QLabel, QLineEdit, QPushButton, QRadioButton, QComboBox, QVBoxLayout, QHBoxLayout, QStackedWidget, QButtonGroup
from PyQt6.QtCore import Qt
import sys
class MainWindow(QWidget):
def __init__(self, stacked_widget):
super().__init__()
self.stacked_widget = stacked_widget
self.label = QLabel("initial_label")
self.input_field = QLineEdit()
self.input_field.setPlaceholderText("input_name")
self.button = QPushButton("button")
self.radio_group = QButtonGroup()
self.radio1 = QRadioButton("option_1")
self.radio2 = QRadioButton("option_2")
self.radio_group.addButton(self.radio1)
self.radio_group.addButton(self.radio2)
self.radio1.setChecked(True)
radio_layout = QHBoxLayout()
radio_layout.addWidget(self.radio1)
radio_layout.addWidget(self.radio2)
self.combo_box = QComboBox()
self.combo_box.addItems(["1", "2", "3"])
self.sub_window_button = QPushButton("sub_window")
layout = QVBoxLayout()
layout.addWidget(self.label)
layout.addWidget(self.input_field)
layout.addWidget(self.button)
layout.addLayout(radio_layout)
layout.addWidget(self.combo_box)
layout.addWidget(self.sub_window_button)
self.setLayout(layout)
self.button.clicked.connect(self.output_message)
self.sub_window_button.clicked.connect(self.go_to_sub_window)
def output_message(self):
name = self.input_field.text()
base_message = f"Hello, {name}! "
if self.radio1.isChecked():
message = base_message + "(option_1)"
else:
message = base_message + "(option_2)"
self.label.setText(message)
def go_to_sub_window(self):
self.stacked_widget.setCurrentIndex(1)
class SubWindow(QWidget):
def __init__(self, stacked_widget):
super().__init__()
self.stacked_widget = stacked_widget
self.title_label = QLabel("sub_window")
self.message_label = QLabel("initial_label")
self.update_button = QPushButton("button")
self.back_button = QPushButton("back")
layout = QVBoxLayout()
layout.addWidget(self.title_label)
layout.addWidget(self.message_label)
layout.addWidget(self.update_button)
layout.addWidget(self.back_button)
self.setLayout(layout)
self.update_button.clicked.connect(self.update_message)
self.back_button.clicked.connect(self.go_to_main)
def update_message(self):
self.message_label.setText("button_pressed")
def go_to_main(self):
self.stacked_widget.setCurrentIndex(0)
def main():
try:
app = QApplication(sys.argv)
stacked_widget = QStackedWidget()
stacked_widget.setFixedSize(600, 400)
main_window = MainWindow(stacked_widget)
sub_window = SubWindow(stacked_widget)
stacked_widget.addWidget(main_window)
stacked_widget.addWidget(sub_window)
stacked_widget.show()
sys.exit(app.exec())
except:
sys.exit(1)
if __name__ == "__main__":
main()
生成されたプログラムの実行結果
-
メイン画面
-
サブ画面
枠線がなくタイトルに「.py」がついているということを除けば、ボタンやラベルに書かれているテキストは同じで機能も元のサンプルと同じ動作をしました。
サンプルプログラム(三つ目)
詳細設計書
詳細設計書
ライフゲーム シミュレーション Python詳細設計書
1. 概要
ライフゲーム(Conway's Game of Life)のシミュレーションを実装するPythonプログラム。20x20のグリッドで、グライダーパターンを初期状態として3世代分のシミュレーションを行う。
2. システム構成
2.1 クラス構成図
3. 詳細設計
3.1 Board クラス
生命が存在するセルの状態を管理する。
フィールド
cells: List[List[bool]] # セルの状態を保持する2次元配列
width: int # ボードの幅
height: int # ボードの高さ
メソッド
init(width: int, height: int)
Input: width(int), height(int)
Process:
1. self.width = width
2. self.height = height
3. self.cells = [[False] * width for _ in range(height)]を生成
Output: なし
get(x: int, y: int) -> bool
Input: x(int), y: int
Process:
1. cells[y][x]の値を返す
Output: セルの状態(bool)
set(x: int, y: int, state: bool)
Input: x(int), y(int), state(bool)
Process:
1. cells[y][x] = stateを設定
Output: なし
3.2 Game クラス
ライフゲームのロジックを実装する。
フィールド
board: Board # ボードの状態を管理するBoardインスタンス
メソッド
init(width: int, height: int)
Input: width(int), height(int)
Process:
1. self.board = Board(width, height)でボードを初期化
Output: なし
count_neighbors(x: int, y: int) -> int
Input: x(int), y(int)
Process:
1. neighbors = 0を初期化
2. 周囲8マスをループ:
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
if dx == 0 and dy == 0:
continue
newX = (x + dx + self.board.width) % self.board.width
newY = (y + dy + self.board.height) % self.board.height
if self.board.get(newX, newY):
neighbors += 1
Output: neighbors(int)
next()
Input: なし
Process:
1. new_board = Board(self.board.width, self.board.height)
2. 全セルをループ:
for y in range(self.board.height):
for x in range(self.board.width):
neighbors = self.count_neighbors(x, y)
current_state = self.board.get(x, y)
if current_state:
new_state = neighbors in [2, 3]
else:
new_state = neighbors == 3
new_board.set(x, y, new_state)
3. self.board = new_board
Output: なし
set_cell(x: int, y: int, state: bool)
Input: x(int), y(int), state(bool)
Process:
1. self.board.set(x, y, state)を実行
Output: なし
get_active_bounds() -> Tuple[int, int]
Input: なし
Process:
1. minY = self.board.height
2. maxY = 0
3. 全セルをループして生存セルを探索:
for y in range(self.board.height):
for x in range(self.board.width):
if self.board.get(x, y):
minY = min(minY, y)
maxY = max(maxY, y)
4. 生存セルがない場合:
if minY > maxY:
minY = 0
maxY = 4
5. 表示範囲の調整:
minY = max(0, minY - 1)
maxY = min(self.board.height - 1, maxY + 1)
Output: (minY, maxY)
print_board()
Input: なし
Process:
1. 列番号の表示:
print(" ", end="")
for x in range(self.board.width):
print(f"{x:2}", end=" ")
print()
2. アクティブな行の範囲を取得:
minY, maxY = self.get_active_bounds()
3. 各行の表示:
for y in range(minY, maxY + 1):
print(f"{y:2} ", end="")
for x in range(self.board.width):
cell = "■ " if self.board.get(x, y) else "□ "
print(cell, end="")
print()
Output: なし(標準出力に直接表示)
3.3 メイン処理
Input: なし
Process:
1. import time
2. 定数定義:
WIDTH = 20
HEIGHT = 20
MAX_GENERATIONS = 3
DELAY = 0.2 # 200ミリ秒
3. game = Game(WIDTH, HEIGHT)を初期化
4. グライダーパターンを設定:
初期配置 = [(1,0), (2,1), (0,2), (1,2), (2,2)]
for x, y in 初期配置:
game.set_cell(x, y, True)
5. シミュレーション実行:
for generation in range(1, MAX_GENERATIONS + 1):
print(f"Generation: {generation} / {MAX_GENERATIONS}")
print("-" * 41)
game.print_board()
print("-" * 41)
if generation < MAX_GENERATIONS:
game.next()
time.sleep(DELAY)
6. print("Simulation completed!")
Output: なし(標準出力に直接表示)
4. 出力形式
Generation: 1 / 3
-----------------------------------------
0 1 2 3 4 5 6 7 8 9 ...
0 □ □ □ □ □ □ □ □ □ □ ...
1 □ ■ □ □ □ □ □ □ □ □ ...
2 □ □ ■ □ □ □ □ □ □ □ ...
3 □ ■ ■ ■ □ □ □ □ □ □ ...
4 □ □ □ □ □ □ □ □ □ □ ...
-----------------------------------------
5. エラーハンドリング
- 本プログラムでは特別なエラーハンドリングは実装しない
- 配列の境界チェックはモジュロ演算(%演算子)によるトーラス構造で自動的に処理
6. 制約事項
- ボードサイズ: 20x20(固定)
- シミュレーション世代数: 3世代(固定)
- 表示間隔: 200ミリ秒(固定)
- 最小表示行数: 5行
- Python 3.6以上が必要
- 必要な標準ライブラリ: time
生成されたプログラム
import time
from typing import List, Tuple
class Board:
def __init__(self, width: int, height: int):
self.width = width
self.height = height
self.cells = [[False] * width for _ in range(height)]
def get(self, x: int, y: int) -> bool:
return self.cells[y][x]
def set(self, x: int, y: int, state: bool):
self.cells[y][x] = state
class Game:
def __init__(self, width: int, height: int):
self.board = Board(width, height)
def count_neighbors(self, x: int, y: int) -> int:
neighbors = 0
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
if dx == 0 and dy == 0:
continue
newX = (x + dx + self.board.width) % self.board.width
newY = (y + dy + self.board.height) % self.board.height
if self.board.get(newX, newY):
neighbors += 1
return neighbors
def next(self):
new_board = Board(self.board.width, self.board.height)
for y in range(self.board.height):
for x in range(self.board.width):
neighbors = self.count_neighbors(x, y)
current_state = self.board.get(x, y)
if current_state:
new_state = neighbors in [2, 3]
else:
new_state = neighbors == 3
new_board.set(x, y, new_state)
self.board = new_board
def set_cell(self, x: int, y: int, state: bool):
self.board.set(x, y, state)
def get_active_bounds(self) -> Tuple[int, int]:
minY = self.board.height
maxY = 0
for y in range(self.board.height):
for x in range(self.board.width):
if self.board.get(x, y):
minY = min(minY, y)
maxY = max(maxY, y)
if minY > maxY:
minY = 0
maxY = 4
minY = max(0, minY - 1)
maxY = min(self.board.height - 1, maxY + 1)
return (minY, maxY)
def print_board(self):
print(" ", end="")
for x in range(self.board.width):
print(f"{x:2}", end=" ")
print()
minY, maxY = self.get_active_bounds()
for y in range(minY, maxY + 1):
print(f"{y:2} ", end="")
for x in range(self.board.width):
cell = "■ " if self.board.get(x, y) else "□ "
print(cell, end="")
print()
def main():
WIDTH = 20
HEIGHT = 20
MAX_GENERATIONS = 3
DELAY = 0.2
game = Game(WIDTH, HEIGHT)
glider = [(1,0), (2,1), (0,2), (1,2), (2,2)]
for x, y in glider:
game.set_cell(x, y, True)
for generation in range(1, MAX_GENERATIONS + 1):
print(f"Generation: {generation} / {MAX_GENERATIONS}")
print("-" * 41)
game.print_board()
print("-" * 41)
if generation < MAX_GENERATIONS:
game.next()
time.sleep(DELAY)
print("Simulation completed!")
if __name__ == "__main__":
main()
生成されたプログラムの実行結果
Generation: 1 / 3
-----------------------------------------
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
0 □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
1 □ □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
2 ■ ■ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
3 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
-----------------------------------------
Generation: 2 / 3
-----------------------------------------
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
0 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
1 ■ □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
2 □ ■ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
3 □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
4 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
-----------------------------------------
Generation: 3 / 3
-----------------------------------------
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
0 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
1 □ □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
2 ■ □ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
3 □ ■ ■ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
4 □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □ □
-----------------------------------------
Simulation completed!
列数の表示間隔が全体的に一マス分増えています。
まとめ
どのサンプルも集約(変換)後に表示が少し異なる部分はありましたが、機能的な問題はありませんでした。
今回、直接コードからコードに変換したわけではなく、詳細設計書の作成をはさんで詳細設計書からコードの生成を行っています。
設計書とコードを別々に生成したわけではないため、設計書とコードのつながりが強く、コードに間違いがあった際も設計書を読むことで問題のある箇所を特定しやすくなります。
場合によっては、コードではなく詳細設計書の方を修正してからllmを使いコードの再生成を行って問題を解決するというのもありかと思います。
おわりに
今回の場合はコードを生成するところまでですが、テストケースの作成や実行するノード、(pythonに変換するのが確定している場合)pyinstallerでexe化までを行うノードなど色々付け足してみるのも面白そうです。
長くなりましたが、読んでいただきありがとうございます。
また、次の機会があればよろしくお願いします。
Discussion