📝

PythonからWindows API をコールしてOSプロセスを処理する

に公開

実現したいこと

Windowsで起動中のプロセスに対して、Pythonを通じて何らかの処理をしたい。
特定のプロセスを見つけてキルする、等)

Pythonで使えるモジュールの情報を調査すると、

などが、現在主な解決方法だと知る。

psutilだと処理が重い感じがあって、subprocessだと関数名の通り、別途プロセスを立ち上げる(んですよね?)必要があり、しかもそのプロセスで実行するコードは、単独ファイルとして管理、あるいはプロジェクトリソースに内蔵する必要がある。実現したいこと(特定のプロセスを見つけてキルする)に対して、やることが大袈裟だなぁ〜と感じた。
(まさに個人的感想です。)

なので、Pythonから直接システムAPI(今回はWindows API)をコールして何とかできないかを調査・実装してみた。

概念と定義の説明

そもそもWindows APIとは

プログラムから直接OSの機能を使える、Windowsが提供する関数。

今回使うWindows API

  • kernel32.dll標準ライブラリ
  • psapi.dllWindowsに標準搭載されているプロセス情報取得用のライブラリ

PythonでWindows APIを使う段取り

モジュールをインポート

import ctypes               # C言語の関数やデータ型をPythonから使うためのライブラリ
from ctypes import wintypes # Windows固有型を定義したライブラリ

APIのDLLをロード

psapi    = ctypes.WinDLL('Psapi.dll')
kernel32 = ctypes.WinDLL('kernel32.dll')

定数を定義

MAX_PATH                  = 260
MAX_PROCESSES             = 2048
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ           = 0x0010
PROCESS_TERMINATE         = 0x0001

「何を根拠に定数の名称と値を定義してるの?」の疑問があったのだが、
MAX_PATHは下記のように定義されているらしい(あとで実際のファイルを確認する)。

windows.h
#define MAX_PATH 260

ちなみに、ファイルパスの最大文字数(NULL終端込み)は、
260 = ドライブ文字(1) + コロン(1) + バックスラッシュ(1) + フォルダ/ファイル名(最大 256) + NULL 終端(1)

MAX_PROCESSESは実際の稼働環境に基づいて設定する必要がある。

また、PROCESS_*0x系の定義は、MicrosoftのAPIリフェレンスのヘッダーファイルのところを参照するのが一般的なやり方らしい。

関数の戻り値型設定

kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype  = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype  = wintypes.BOOL
kernel32.TerminateProcess.argtypes = [wintypes.HANDLE, wintypes.UINT]
kernel32.TerminateProcess.restype  = wintypes.BOOL
psapi.EnumProcesses.argtypes = [ctypes.POINTER(wintypes.DWORD), wintypes.DWORD, ctypes.POINTER(wintypes.DWORD)]
psapi.EnumProcesses.restype  = wintypes.BOOL
psapi.GetModuleBaseNameW.argtypes = [wintypes.HANDLE, wintypes.HANDLE, wintypes.LPWSTR, wintypes.DWORD]
psapi.GetModuleBaseNameW.restype  = wintypes.DWORD

EXIT_SUCCESS = 0

型安全性と正しいデータ受け渡しのため
(信頼性・可読性のため)

実装全コード

import ctypes
import ctypes.wintypes as wintypes

# 必要なDLL
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
psapi    = ctypes.WinDLL("psapi", use_last_error=True)

# 定義
MAX_PATH                  = 260
MAX_PROCESSES             = 2048
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ           = 0x0010
PROCESS_TERMINATE         = 0x0001

# OpenProcess 引数・戻り値型設定
kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype  = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype  = wintypes.BOOL
kernel32.TerminateProcess.argtypes = [wintypes.HANDLE, wintypes.UINT]
kernel32.TerminateProcess.restype  = wintypes.BOOL
psapi.EnumProcesses.argtypes = [ctypes.POINTER(wintypes.DWORD), wintypes.DWORD, ctypes.POINTER(wintypes.DWORD)]
psapi.EnumProcesses.restype  = wintypes.BOOL
psapi.GetModuleBaseNameW.argtypes = [wintypes.HANDLE, wintypes.HANDLE, wintypes.LPWSTR, wintypes.DWORD]
psapi.GetModuleBaseNameW.restype  = wintypes.DWORD

EXIT_SUCCESS = 0


def kill_process_by_windows_api(target_name="dummy.exe"):
    """
    指定したプロセス名を列挙し、TerminateProcessで終了させる
    """
    arr = (wintypes.DWORD * MAX_PROCESSES)()
    bytes_returned = wintypes.DWORD()

    if not psapi.EnumProcesses(arr, ctypes.sizeof(arr), ctypes.byref(bytes_returned)):
        raise ctypes.WinError(ctypes.get_last_error())

    num_procs = bytes_returned.value // ctypes.sizeof(wintypes.DWORD)

    for pid in arr[:num_procs]:
        if pid in (0, 4):
            continue  # Idle Process や System Process はスキップ

        # 情報取得 + Kill 権限をまとめて確保
        access = PROCESS_QUERY_INFORMATION | PROCESS_VM_READ | PROCESS_TERMINATE
        h_process = kernel32.OpenProcess(access, False, pid)

        if not h_process:
            continue  # 開けないプロセス(権限不足など)はスキップ

        try:
            exe_name = (wintypes.WCHAR * MAX_PATH)()
            if psapi.GetModuleBaseNameW(h_process, None, exe_name, MAX_PATH) > 0:
                if exe_name.value.lower() == target_name.lower():
                    if not kernel32.TerminateProcess(h_process, EXIT_SUCCESS):
                        raise ctypes.WinError(ctypes.get_last_error())
                    print(f"Killed {exe_name.value} (PID={pid})")

        finally:
            # どんな場合でも必ずハンドルを閉じる
            kernel32.CloseHandle(h_process)

if __name__ == '__main__':
    kill_process_by_windows_api("dummy.exe")

使いやすいように関数化

プロセスの名称を取得

get_process_name.py
import ctypes
import ctypes.wintypes as wintypes

# Windows API のロード
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
psapi    = ctypes.WinDLL("psapi", use_last_error=True)

# 定数
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ           = 0x0010
MAX_PATH = 260

# 戻り値・引数の型を定義
kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype  = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype  = wintypes.BOOL

psapi.GetModuleBaseNameW.argtypes = [
    wintypes.HANDLE, wintypes.HMODULE,
    wintypes.LPWSTR, wintypes.DWORD
]
psapi.GetModuleBaseNameW.restype = wintypes.DWORD


def get_process_name(pid: int) -> str | None:
    """
    プロセスIDからプロセス名を取得する。
    取得できない場合は None を返す。
    """
    # プロセスを開く
    h_process = kernel32.OpenProcess(
        PROCESS_QUERY_INFORMATION | PROCESS_VM_READ,
        False,
        pid
    )
    if not h_process:
        return None  # アクセス拒否や存在しないPIDなど

    try:
        exe_name_buf = (wintypes.WCHAR * MAX_PATH)()
        if psapi.GetModuleBaseNameW(h_process, None, exe_name_buf, MAX_PATH) > 0:
            return exe_name_buf.value
        else:
            return None
    finally:
        kernel32.CloseHandle(h_process)


# --- 動作例 ---
if __name__ == "__main__":
    import os
    current_pid = os.getpid()
    print(f"PID={current_pid}, Name={get_process_name(current_pid)}")

プロセスの親プロセルIDを取得

get_parent_pid.py
import ctypes
import ctypes.wintypes as wintypes
import os

# --- Windows API のロード ---
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)

# --- 定数 ---
TH32CS_SNAPPROCESS = 0x00000002
MAX_PATH = 260
INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value

# --- 構造体定義 ---
class PROCESSENTRY32(ctypes.Structure):
    _fields_ = [
        ("dwSize", wintypes.DWORD),
        ("cntUsage", wintypes.DWORD),
        ("th32ProcessID", wintypes.DWORD),
        ("th32DefaultHeapID", ctypes.c_void_p),  # ULONG_PTR 相当
        ("th32ModuleID", wintypes.DWORD),
        ("cntThreads", wintypes.DWORD),
        ("th32ParentProcessID", wintypes.DWORD),  # 親プロセス ID
        ("pcPriClassBase", wintypes.LONG),
        ("dwFlags", wintypes.DWORD),
        ("szExeFile", wintypes.WCHAR * MAX_PATH)  # 実行ファイル名
    ]

# --- 関数定義 ---
CreateToolhelp32Snapshot = kernel32.CreateToolhelp32Snapshot
CreateToolhelp32Snapshot.argtypes = [wintypes.DWORD, wintypes.DWORD]
CreateToolhelp32Snapshot.restype  = wintypes.HANDLE

Process32First = kernel32.Process32FirstW
Process32First.argtypes = [wintypes.HANDLE, ctypes.POINTER(PROCESSENTRY32)]
Process32First.restype  = wintypes.BOOL

Process32Next = kernel32.Process32NextW
Process32Next.argtypes = [wintypes.HANDLE, ctypes.POINTER(PROCESSENTRY32)]
Process32Next.restype  = wintypes.BOOL

CloseHandle = kernel32.CloseHandle
CloseHandle.argtypes = [wintypes.HANDLE]
CloseHandle.restype  = wintypes.BOOL

# --- 親プロセスIDを取得する関数 ---
def get_parent_pid(pid: int) -> int | None:
    """
    指定したプロセスIDの親プロセスIDを返す。
    見つからない場合は None を返す。
    """
    snap = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
    if snap == INVALID_HANDLE_VALUE:
        raise ctypes.WinError(ctypes.get_last_error())

    entry = PROCESSENTRY32()
    entry.dwSize = ctypes.sizeof(PROCESSENTRY32)

    try:
        if not Process32First(snap, ctypes.byref(entry)):
            raise ctypes.WinError(ctypes.get_last_error())

        while True:
            if entry.th32ProcessID == pid:
                return int(entry.th32ParentProcessID)
            if not Process32Next(snap, ctypes.byref(entry)):
                break
        return None
    finally:
        CloseHandle(snap)

# --- 単独で動作させる部分 ---
if __name__ == "__main__":
    current_pid = os.getpid()
    parent_pid = get_parent_pid(current_pid)
    print(f"Current PID: {current_pid}")
    print(f"Parent PID : {parent_pid}")
    print('')

Discussion