📝
PythonからWindows API をコールしてOSプロセスを処理する
実現したいこと
Windowsで起動中のプロセスに対して、Python
を通じて何らかの処理をしたい。
(特定のプロセスを見つけてキルする、等)
Python
で使えるモジュールの情報を調査すると、
などが、現在主な解決方法だと知る。
psutil
だと処理が重い感じがあって、subprocess
だと関数名の通り、別途プロセスを立ち上げる(んですよね?)必要があり、しかもそのプロセスで実行するコードは、単独ファイルとして管理、あるいはプロジェクトリソースに内蔵する必要がある。実現したいこと(特定のプロセスを見つけてキルする)に対して、やることが大袈裟だなぁ〜と感じた。
(まさに個人的感想です。)
なので、Python
から直接システムAPI(今回はWindows API
)をコールして何とかできないかを調査・実装してみた。
概念と定義の説明
そもそもWindows APIとは
プログラムから直接OSの機能を使える、Windowsが提供する関数。
今回使うWindows API
-
kernel32.dll
標準ライブラリ -
psapi.dll
Windowsに標準搭載されているプロセス情報取得用のライブラリ
PythonでWindows APIを使う段取り
モジュールをインポート
import ctypes # C言語の関数やデータ型をPythonから使うためのライブラリ
from ctypes import wintypes # Windows固有型を定義したライブラリ
APIのDLLをロード
psapi = ctypes.WinDLL('Psapi.dll')
kernel32 = ctypes.WinDLL('kernel32.dll')
定数を定義
MAX_PATH = 260
MAX_PROCESSES = 2048
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
PROCESS_TERMINATE = 0x0001
「何を根拠に定数の名称と値を定義してるの?」の疑問があったのだが、
MAX_PATH
は下記のように定義されているらしい(あとで実際のファイルを確認する)。
windows.h
#define MAX_PATH 260
ちなみに、ファイルパスの最大文字数(NULL終端込み)は、
260 = ドライブ文字(1) + コロン(1) + バックスラッシュ(1) + フォルダ/ファイル名(最大 256) + NULL 終端(1)
MAX_PROCESSES
は実際の稼働環境に基づいて設定する必要がある。
また、PROCESS_*
の0x
系の定義は、MicrosoftのAPIリフェレンスのヘッダーファイルのところを参照するのが一般的なやり方らしい。
関数の戻り値型設定
kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL
kernel32.TerminateProcess.argtypes = [wintypes.HANDLE, wintypes.UINT]
kernel32.TerminateProcess.restype = wintypes.BOOL
psapi.EnumProcesses.argtypes = [ctypes.POINTER(wintypes.DWORD), wintypes.DWORD, ctypes.POINTER(wintypes.DWORD)]
psapi.EnumProcesses.restype = wintypes.BOOL
psapi.GetModuleBaseNameW.argtypes = [wintypes.HANDLE, wintypes.HANDLE, wintypes.LPWSTR, wintypes.DWORD]
psapi.GetModuleBaseNameW.restype = wintypes.DWORD
EXIT_SUCCESS = 0
型安全性と正しいデータ受け渡しのため
(信頼性・可読性のため)
実装全コード
import ctypes
import ctypes.wintypes as wintypes
# 必要なDLL
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
psapi = ctypes.WinDLL("psapi", use_last_error=True)
# 定義
MAX_PATH = 260
MAX_PROCESSES = 2048
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
PROCESS_TERMINATE = 0x0001
# OpenProcess 引数・戻り値型設定
kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL
kernel32.TerminateProcess.argtypes = [wintypes.HANDLE, wintypes.UINT]
kernel32.TerminateProcess.restype = wintypes.BOOL
psapi.EnumProcesses.argtypes = [ctypes.POINTER(wintypes.DWORD), wintypes.DWORD, ctypes.POINTER(wintypes.DWORD)]
psapi.EnumProcesses.restype = wintypes.BOOL
psapi.GetModuleBaseNameW.argtypes = [wintypes.HANDLE, wintypes.HANDLE, wintypes.LPWSTR, wintypes.DWORD]
psapi.GetModuleBaseNameW.restype = wintypes.DWORD
EXIT_SUCCESS = 0
def kill_process_by_windows_api(target_name="dummy.exe"):
"""
指定したプロセス名を列挙し、TerminateProcessで終了させる
"""
arr = (wintypes.DWORD * MAX_PROCESSES)()
bytes_returned = wintypes.DWORD()
if not psapi.EnumProcesses(arr, ctypes.sizeof(arr), ctypes.byref(bytes_returned)):
raise ctypes.WinError(ctypes.get_last_error())
num_procs = bytes_returned.value // ctypes.sizeof(wintypes.DWORD)
for pid in arr[:num_procs]:
if pid in (0, 4):
continue # Idle Process や System Process はスキップ
# 情報取得 + Kill 権限をまとめて確保
access = PROCESS_QUERY_INFORMATION | PROCESS_VM_READ | PROCESS_TERMINATE
h_process = kernel32.OpenProcess(access, False, pid)
if not h_process:
continue # 開けないプロセス(権限不足など)はスキップ
try:
exe_name = (wintypes.WCHAR * MAX_PATH)()
if psapi.GetModuleBaseNameW(h_process, None, exe_name, MAX_PATH) > 0:
if exe_name.value.lower() == target_name.lower():
if not kernel32.TerminateProcess(h_process, EXIT_SUCCESS):
raise ctypes.WinError(ctypes.get_last_error())
print(f"Killed {exe_name.value} (PID={pid})")
finally:
# どんな場合でも必ずハンドルを閉じる
kernel32.CloseHandle(h_process)
if __name__ == '__main__':
kill_process_by_windows_api("dummy.exe")
使いやすいように関数化
プロセスの名称を取得
get_process_name.py
import ctypes
import ctypes.wintypes as wintypes
# Windows API のロード
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
psapi = ctypes.WinDLL("psapi", use_last_error=True)
# 定数
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
MAX_PATH = 260
# 戻り値・引数の型を定義
kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL
psapi.GetModuleBaseNameW.argtypes = [
wintypes.HANDLE, wintypes.HMODULE,
wintypes.LPWSTR, wintypes.DWORD
]
psapi.GetModuleBaseNameW.restype = wintypes.DWORD
def get_process_name(pid: int) -> str | None:
"""
プロセスIDからプロセス名を取得する。
取得できない場合は None を返す。
"""
# プロセスを開く
h_process = kernel32.OpenProcess(
PROCESS_QUERY_INFORMATION | PROCESS_VM_READ,
False,
pid
)
if not h_process:
return None # アクセス拒否や存在しないPIDなど
try:
exe_name_buf = (wintypes.WCHAR * MAX_PATH)()
if psapi.GetModuleBaseNameW(h_process, None, exe_name_buf, MAX_PATH) > 0:
return exe_name_buf.value
else:
return None
finally:
kernel32.CloseHandle(h_process)
# --- 動作例 ---
if __name__ == "__main__":
import os
current_pid = os.getpid()
print(f"PID={current_pid}, Name={get_process_name(current_pid)}")
プロセスの親プロセルIDを取得
get_parent_pid.py
import ctypes
import ctypes.wintypes as wintypes
import os
# --- Windows API のロード ---
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
# --- 定数 ---
TH32CS_SNAPPROCESS = 0x00000002
MAX_PATH = 260
INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
# --- 構造体定義 ---
class PROCESSENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", wintypes.DWORD),
("cntUsage", wintypes.DWORD),
("th32ProcessID", wintypes.DWORD),
("th32DefaultHeapID", ctypes.c_void_p), # ULONG_PTR 相当
("th32ModuleID", wintypes.DWORD),
("cntThreads", wintypes.DWORD),
("th32ParentProcessID", wintypes.DWORD), # 親プロセス ID
("pcPriClassBase", wintypes.LONG),
("dwFlags", wintypes.DWORD),
("szExeFile", wintypes.WCHAR * MAX_PATH) # 実行ファイル名
]
# --- 関数定義 ---
CreateToolhelp32Snapshot = kernel32.CreateToolhelp32Snapshot
CreateToolhelp32Snapshot.argtypes = [wintypes.DWORD, wintypes.DWORD]
CreateToolhelp32Snapshot.restype = wintypes.HANDLE
Process32First = kernel32.Process32FirstW
Process32First.argtypes = [wintypes.HANDLE, ctypes.POINTER(PROCESSENTRY32)]
Process32First.restype = wintypes.BOOL
Process32Next = kernel32.Process32NextW
Process32Next.argtypes = [wintypes.HANDLE, ctypes.POINTER(PROCESSENTRY32)]
Process32Next.restype = wintypes.BOOL
CloseHandle = kernel32.CloseHandle
CloseHandle.argtypes = [wintypes.HANDLE]
CloseHandle.restype = wintypes.BOOL
# --- 親プロセスIDを取得する関数 ---
def get_parent_pid(pid: int) -> int | None:
"""
指定したプロセスIDの親プロセスIDを返す。
見つからない場合は None を返す。
"""
snap = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
if snap == INVALID_HANDLE_VALUE:
raise ctypes.WinError(ctypes.get_last_error())
entry = PROCESSENTRY32()
entry.dwSize = ctypes.sizeof(PROCESSENTRY32)
try:
if not Process32First(snap, ctypes.byref(entry)):
raise ctypes.WinError(ctypes.get_last_error())
while True:
if entry.th32ProcessID == pid:
return int(entry.th32ParentProcessID)
if not Process32Next(snap, ctypes.byref(entry)):
break
return None
finally:
CloseHandle(snap)
# --- 単独で動作させる部分 ---
if __name__ == "__main__":
current_pid = os.getpid()
parent_pid = get_parent_pid(current_pid)
print(f"Current PID: {current_pid}")
print(f"Parent PID : {parent_pid}")
print('')
Discussion