🐥

麻雀AI試してみて勉強してみた。

2024/06/21に公開

麻雀が上手くなりたかった。

業務でロジカルシンキングの例の一つに出てきたのが麻雀でした。
麻雀とは。

  • ポーカーみたいに「役」を34種×4個の牌で作るゲームである。
  • 役を揃えるには人より、先に上がる必要がある。
  • 最終的に1番多く点数を持っている人が勝ちになります。

ただ、役の計算に「席順」や「ドラ」というボーナス要素もある楽しいでゲームです。

これが役を揃えるのがむずかしい・・・。

実践してみてたら、打ち方の反省のをしたくなりました。反省をするためには他の可能性も検討しないといけない。そのための一つのツールとして、AIによる予測をやってみたくなりました。

環境設定

自分の試した環境は、

  • Google Colab
  • pyTorch
    です。
    コピーすれば時間をかければ誰でもできるので試してみてください。

牌譜の情報源。

天鳳という麻雀ゲームに課金者が配布を収集できる機能があるので、そこから抽出しました。
形式としては,
xmlファイルです。

  • 麻雀牌がどの順に出てくるか
  • 対戦形式と対戦番号
  • 対局者情報
  • ゲーム開始時の点数情報
    • ゲームの親
    • リーチ
    • メンツ(3個1組)の情報
  • ドラ
  • 上がった情報
    • あがり点
    • 点数移動
  • 流局
    の情報から基本的にはなっている。

詳しくは内容は以下を参照してください。
https://m77.hatenablog.com/entry/2017/05/21/214529#AGARI

覚え込ませたデータはこちらです。
参考記事
https://qiita.com/neuzen-ryuki/items/48fa468fa0b1c502ed1e
実際のデータ
https://drive.google.com/drive/folders/19lXnBEPuaro3gyBvqWWz2ZGg8h0USHjJ

構成の解説。

構成は以下のとおりです

  1. データ準備:

    • 圧縮ファイルから麻雀データを取り出して、使いやすい形に変換します。
  2. データセットの作成:

    • データを扱いやすくするためのクラスを作ります。
  3. データの分割と読み込み:

    • データを訓練用、検証用、テスト用に分けて、効率的に読み込めるようにします。
  4. モデルの定義と学習:

    • 麻雀の役を予測するためのモデルを作って、データを使って学習させます。
  5. 結果の可視化:

    • 学習の進行状況をグラフにして確認します。
  6. テストと評価:

    • 学習したモデルをテストデータで試して、どれだけ正確に予測できるかを確認します。

実際のコード

ライブラリのインストール

!pip install japanize-matplotlib
!pip install torchvision
!pip install grad-cam
  • Matplotlibで日本語フォントを使用可能にするライブラリをインストール。

  • PyTorchの画像処理ライブラリであるtorchvisionをインストール。

  • 画像分類モデルの可視化に使用するGrad-CAMライブラリをインストール。

麻雀AI学習のためのライブラリインポート

import os
import tarfile
import tqdm
import xml.etree.ElementTree as ET
import pandas as pd
import numpy as np
import collections
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
import torch.optim as optim
import matplotlib.pyplot as plt
import japanize_matplotlib
from torchvision import models
from torchvision.utils import make_grid
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import show_cam_on_image, preprocess_image

麻雀AI学習のためのライブラリインポート

import os:

  • オペレーティングシステムとのインターフェースを提供する標準ライブラリ。

import tarfile:

  • tarアーカイブの読み書きを行うための標準ライブラリ。

import tqdm:

  • 進行状況バーを表示するライブラリ。

import xml.etree.ElementTree as ET:

  • XMLデータのパースおよび操作を行う標準ライブラリ。

import pandas as pd:

  • データ操作や解析を行うためのライブラリ。

import numpy as np:

  • 数値計算を効率的に行うためのライブラリ。

import collections:

  • 高度なコレクションデータ型を提供する標準ライブラリ。

import torch:

  • PyTorchの基本モジュール。

import torch.nn as nn:

  • PyTorchのニューラルネットワークモジュール。

from sklearn.model_selection import train_test_split:

  • データセットをトレーニングセットとテストセットに分割する関数。

import torch.optim as optim:

  • PyTorchの最適化アルゴリズムモジュール。

import matplotlib.pyplot as plt:

  • データの可視化を行うためのMatplotlibライブラリ。

import japanize_matplotlib:

  • Matplotlibで日本語フォントを使用可能にするライブラリ。

from torchvision import models:

  • 事前訓練された画像認識モデルを含むtorchvisionのモジュール。

from torchvision.utils import make_grid:

  • 画像のグリッドを作成するための関数。

from pytorch_grad_cam import GradCAM:

  • Grad-CAMを使用して画像分類モデルの内部を可視化するライブラリ。

from pytorch_grad_cam.utils.image import show_cam_on_image, preprocess_image:

  • Grad-CAMの結果を画像上に表示するためのユーティリティ関数、および画像の前処理を行う関数。

麻雀AI学習のためのデータセット準備

前述のzipファイルをダウンロードして、自分のマイドライブに保存します。
(どの年代でもいいです。nameとファイルの名前やpathとフォルダーの構成は一緒にしてください)

from google.colab import drive
drive.mount('/content/drive')

local_folder_name = 'tenho_2019'
drive_folder_path = "/content/drive/MyDrive/mahjong/tenhou" # 2019.tar.xzを置いたフォルダパス
file_name = "2019.tar.xz"

with tarfile.open(os.path.join(drive_folder_path, file_name)) as tar:
    tar.extractall("/content/tenho_2019/")

from google.colab import drive:

  • Google Colabのドライブ機能をインポート。

drive.mount('/content/drive'):

  • Google ドライブをColab環境にマウントし、ドライブの内容にアクセスできるようにします。

local_folder_name = 'tenho_2019':

  • 解凍先のローカルフォルダ名を設定。

drive_folder_path = "/content/drive/MyDrive/mahjong/tenhou":

  • Google ドライブ内に2019.tar.xzファイルを置いたフォルダのパスを設定。

file_name = "2019.tar.xz":

  • 使用するtarアーカイブファイルの名前を設定。

with tarfile.open(os.path.join(drive_folder_path, file_name)) as tar::

  • 指定したtarアーカイブファイルを開きます。

tar.extractall("/content/tenho_2019/"):

  • アーカイブの内容を指定したローカルフォルダに解凍します。

XMLデータから和了情報を抽出しデータフレームに格納

XMLファイルから和了情報を抽出してデータフレーム(データを表形式で管理)に追加する処理を行います。

folder_path = "/content/tenho_2019/2019/"
files = os.listdir(folder_path)

agari_df = pd.DataFrame()

for file in tqdm.tqdm(files):
  test = ET.parse(os.path.join(folder_path, file))
  root = test.getroot()

  for child in root:
    if child.tag=="AGARI":
      agari = pd.DataFrame(data = child.attrib, index=[0])
      agari_df = pd.concat([agari_df, agari], ignore_index=True)
  1. フォルダパスとファイルリストの設定:

folder_path = "/content/tenho_2019/2019/":

  • 解凍されたデータセットのフォルダパスを設定。

files = os.listdir(folder_path):

  • 指定フォルダ内のファイル一覧を取得。

 

  1. データフレームの初期化:
    agari_df = pd.DataFrame():
  • 和了(アガリ)情報を格納するための空のデータフレームを作成。

 

  1. XMLファイルの解析とデータ抽出:

for file in tqdm.tqdm(files)::

  • フォルダ内の各ファイルに対してループ処理を開始し、進行状況をtqdmで表示。

test = ET.parse(os.path.join(folder_path, file)):

  • XMLファイルをパース(解析)。

root = test.getroot():

  • XMLのルート要素を取得。

 

  1. 和了情報の抽出とデータフレームへの追加:

for child in root::

  • ルート要素の子要素を順に処理。

if child.tag=="AGARI"::

  • 子要素のタグが「AGARI」の場合に処理を実行。

agari = pd.DataFrame(data = child.attrib, index=[0]):

  • 和了情報をデータフレームに変換。

agari_df = pd.concat([agari_df, agari], ignore_index=True):

  • 取得した和了情報を既存のデータフレームに追加(インデックスを無視して結合)。

和了情報データフレームの内容確認

print(len(agari_df))
agari_df.head()
  1. データフレームの行数確認:
    データフレームの行数と先頭の5行を表示します。
    print(len(agari_df)):
  • データフレームの行数を出力します。これにより、抽出された和了情報の数を確認できます。

 

  1. データフレームの先頭部分確認:

agari_df.head():

  • データフレームの先頭5行を出力します。これにより、抽出された和了情報のサンプルを確認できます。

和了情報から役リストと翻数を抽出する関数

与えられた役のリストから役と翻数を計算し、リストと翻数を返します。

def yaku_append(x):
  han = 0
  yaku_normal = x[0].split(",")
  yakulist_normal = []
  
  if len(yaku_normal) > 1:
    yaku_normal = [yaku_normal[i:i + 2] for i in range(0, len(yaku_normal), 2)]
    yakulist_normal = [i[0] for i in yaku_normal if i[1] != '0']
    han = sum([int(i[1]) for i in yaku_normal if i[1] != '0'])
  
  yakulist = yakulist_normal + x[1].split(",")
  yakulist = [int(i) for i in yakulist if i != "nan"]
  return yakulist, han
  1. 関数の目的:

    • 和了情報から役リストと翻数を抽出します。
  2. 役リストと翻数の初期化:
    han = 0:

  • 翻数の初期値を0に設定します。

yaku_normal = x[0].split(","):

  • カンマ区切りで役情報を分割します。

yakulist_normal = []:

  • 役リストを格納するための空リストを作成します。

 

  1. 通常の役の処理:

    if len(yaku_normal) > 1::

    • 役情報が存在する場合に処理を開始します。

    yaku_normal = [yaku_normal[i:i + 2] for i in range(0, len(yaku_normal), 2)]:

    • 役情報をペア(役のIDと翻数)に分割します。

    yakulist_normal = [i[0] for i in yaku_normal if i[1] != '0']:

    • 翻数が0でない役のIDを役リストに追加します。

    han = sum([int(i[1]) for i in yaku_normal if i[1] != '0']):

    • 翻数が0でない役の翻数を合計します。

 

  1. 特別な役の処理:

    yakulist = yakulist_normal + x[1].split(","):

    • 通常の役リストと特別な役リストを結合します。

    yakulist = [int(i) for i in yakulist if i != "nan"]:

    • リスト内の役IDを整数に変換し、「nan」を除外します。

 

  1. 役リストと翻数の返却:

    return yakulist, han:

    • 役リストと翻数を返却します。

和了情報から役のラベルを生成する関数

リストから使用する役のラベル(これがこの役だよって決める)を作成し、役名リストとともに返します。

def yaku_label(yaku_list):
  yaku = [
      # //// 一飜
      "門前清自摸和","立直","一発","槍槓","嶺上開花",
      "海底摸月","河底撈魚","平和","断幺九","一盃口",
      "自風 東","自風 南","自風 西","自風 北",
      "場風 東","場風 南","場風 西","場風 北",
      "役牌 白","役牌 發","役牌 中",
      # //// 二飜
      "両立直","七対子","混全帯幺九","一気通貫","三色同順",
      "三色同刻","三槓子","対々和","三暗刻","小三元","混老頭",
      # //// 三飜
      "二盃口","純全帯幺九","混一色",
      # //// 六飜
      "清一色",
      # //// 満貫
      "人和",
      # //// 役満
      "天和","地和","大三元","四暗刻","四暗刻単騎","字一色",
      "緑一色","清老頭","九蓮宝燈","純正九蓮宝燈","国士無双",
      "国士無双13面","大四喜","小四喜","四槓子",
      # //// 懸賞役
      "ドラ","裏ドラ","赤ドラ"
  ]

  yaku_label_dic = dict(zip(yaku, range(len(yaku))))
  label_yaku_dic = dict(zip(range(len(yaku)), yaku))

  # 利用する役
  yakuuse = [
    # //// 一飜
    "断幺九",
    "役牌 白","役牌 發","役牌 中",
    # //// 二飜
    "混全帯幺九","一気通貫","三色同順",
    "三色同刻","三槓子","対々和","三暗刻","小三元","混老頭",
    # //// 三飜
    "二盃口","純全帯幺九","混一色",
    # //// 六飜
    "清一色"
  ]

  yakuuse_label_dic = dict(zip(yakuuse, range(len(yakuuse))))
  label_yakuuse_dic = dict(zip(range(len(yakuuse)), yakuuse))

  yaku_str_list = [label_yaku_dic[i] for i in yaku_list] # あがり役名リスト
  yakuuse_str_list = list(set(yaku_str_list) & set(yakuuse)) # ラベルに使うあがり役名リスト

  label_num = len(yakuuse)
  yaku_label = [0] * label_num

  yakuuse_id_list = [yakuuse_label_dic[yakuuse_str] for yakuuse_str in yakuuse_str_list]

  for yakuuse_id in yakuuse_id_list:
    yaku_label[yakuuse_id] = 1

  return yaku_label, yakuuse_str_list
  1. 役とラベルの定義:

yaku:

  • 全ての役名のリストを定義します。
    yaku_label_dic:
  • 役名からインデックスへのマッピングを定義します。
    label_yaku_dic:
  • インデックスから役名へのマッピングを定義します。

 

  1. 利用する役の定義:

yakuuse:

  • 使用する役名のリストを定義します。

yakuuse_label_dic:

  • 使用する役名からインデックスへのマッピングを定義します。

label_yakuuse_dic:

  • 使用するインデックスから役名へのマッピングを定義します。

 

  1. 和了役リストの処理:

yaku_str_list = [label_yaku_dic[i] for i in yaku_list]:

  • 役リストから役名リストを生成します。

yakuuse_str_list = list(set(yaku_str_list) & set(yakuuse)):

  • 利用する役名のリストと和了役名リストの交差を取得します。

 

  1. ラベルの初期化と設定:

label_num = len(yakuuse):

  • 使用する役の数を取得します。

yaku_label = [0] * label_num:

  • ラベルリストを0で初期化します。

yakuuse_id_list = [yakuuse_label_dic[yakuuse_str] for yakuuse_str in yakuuse_str_list]:

  • 使用する役名リストからインデックスリストを生成します。

 

  1. ラベルの設定:

for yakuuse_id in yakuuse_id_list::

  • 使用する役のインデックスに対応するラベルを1に設定します。

 

  1. ラベルと役名リストの返却:

return yaku_label, yakuuse_str_list:

  • ラベルリストと使用する役名リストを返却します。

鳴き情報の処理関数

与えられた鳴き情報から対応する牌のIDと文字列を抽出し、平坦化されたリストとして返します。鳴き情報が存在しない場合は、空のリストを返します。

def naki_treatment(x):
  if isinstance(x, str):
    furo_hai_id = [furo_to_hai(i) for i in x.split(",")]
    furo_hai_str = [[hai_convert_str(j) for j in i] for i in furo_hai_id]
    return sum(furo_hai_id, []), sum(furo_hai_str, [])
  else:
    return [], []
  1. 鳴き情報の処理関数設定:

    • def naki_treatment(x)::
      • 鳴き情報を処理し、牌IDリストと文字列表現を返す関数。
  2. 鳴き情報が文字列であるか確認:

    • if isinstance(x, str)::
      • 入力が文字列である場合に処理を実行します。

 

  1. 鳴き情報のID変換:

    furo_hai_id = [furo_to_hai(i) for i in x.split(",")]:

    • カンマで分割された鳴き情報を各鳴きの牌IDに変換します。

 

  1. 牌IDを文字列に変換:

    furo_hai_str = [[hai_convert_str(j) for j in i] for i in furo_hai_id]:

    • 牌IDをそれぞれ対応する文字列に変換します。

 

  1. リストの平坦化:

    return sum(furo_hai_id, []), sum(furo_hai_str, []):

    • 鳴き牌のIDリストと文字列リストをそれぞれ平坦化して返します。

 

  1. 鳴き情報が存在しない場合:

    else::

    • 鳴き情報が文字列でない場合、空のリストを返します。

 

牌IDを文字列に変換する関数

与えられた牌IDを対応する文字列(萬子、筒子、索子、風牌、三元牌のいずれか)に変換し、結果を返します。

def hai_convert_str(id):
  id_int = int(id)
  hai = ["一", "二", "三", "四", "五", "六", "七", "八", "九", # 萬子
         "①", "②", "③", "④", "⑤", "⑥", "⑦", "⑧", "⑨", # 筒子
         "1", "2", "3", "4", "5", "6", "7", "8", "9", # 索子
         "東", "南", "西", "北", "白", "發", "中"]
  return hai[id_int >> 2]
  1. IDの整数変換:
    • id_int = int(id):
      • 入力された牌IDを整数に変換します。

 

  1. 牌の文字列リスト:
    • hai = [...]:
      • 各牌に対応する文字列をリストに格納します。
      • 萬子、筒子、索子、風牌、三元牌を含む。

 

  1. IDから対応する牌文字列を取得:
    • return hai[id_int >> 2]:
      • 牌IDの右シフト(2ビット)を行い、対応する文字列をリストから取得して返します。

 

牌IDリストを画像表現に変換する関数

牌IDリストを対応する画像表現(Unicode文字)に変換し、文字列として返す。これにより、牌が画像表現に変換されます。

def id_convert_image(id_list):
  # Unicodeコードポイント
  n_unique_tiles = 34
  min_tile_code_point = 126976
  zihai = list(range(min_tile_code_point, min_tile_code_point + 4)) + [126982, 126981, 126980]
  manzu = list(range(126983, 126983 + 9))
  souzu = list(range(126992, 126992 + 9))
  pinzu = list(range(127001, 127001 + 9))

  image_id_list = manzu + pinzu + souzu + zihai

  # 天鳳の牌ID
  tenho_id_list = list(range(0, n_unique_tiles))

  # 天鳳IDとUnicodeコードポイントの対応辞書
  tenho_image_dic = dict(zip(tenho_id_list, image_id_list))

  # 牌IDリストを対応する画像表現に変換
  return "".join([chr(tenho_image_dic[i]) for i in id_list])
  1. Unicodeコードポイントの定義:
    • n_unique_tiles = 34:
      • ユニークな牌の数を定義。
    • min_tile_code_point = 126976:
      • Unicodeコードポイントの開始位置を定義。
    • zihai:
      • 字牌(風牌と三元牌)のUnicodeコードポイントを定義。
    • manzu:
      • 萬子のUnicodeコードポイントを定義。
    • souzu:
      • 索子のUnicodeコードポイントを定義。
    • pinzu:
      • 筒子のUnicodeコードポイントを定義。

 

  1. 牌IDとUnicodeコードポイントの対応リスト作成:
    • image_id_list = manzu + pinzu + souzu + zihai:
      • すべての牌のUnicodeコードポイントを結合してリストに格納。

 

  1. 天鳳の牌IDリスト:
    • tenho_id_list = list(range(0, n_unique_tiles)):
      • 天鳳の牌IDをリストに格納。

 

  1. 対応辞書の作成:
    • tenho_image_dic = dict(zip(tenho_id_list, image_id_list)):
      • 天鳳の牌IDとUnicodeコードポイントを対応させる辞書を作成。

 

  1. 牌IDリストを画像表現に変換:
    • return "".join([chr(tenho_image_dic[i]) for i in id_list]):
      • 牌IDリストを対応するUnicode文字列に変換し、結合して返します。

 

副露情報を牌IDリストに変換する関数

副露情報(ポン・チー・カン)を解析し、対応する牌IDリストを生成します。これにより、副露された牌を具体的に表現することが可能になります。

def furo_to_hai(furo):
  furo = int(furo)
  if (furo & 0x0004): # 順子の場合
    pattern = (furo & 0xFC00) >> 10
    pattern = int(pattern / 3)
    color_id = int(pattern / 7)
    number = pattern % 7 + 1
    mentsu_num = [number, number + 1, number + 2]
    soezi = [(furo & 0x0018) >> 3, (furo & 0x0060) >> 5, (furo & 0x0180) >> 7] # 牌添字1〜3を取得
    mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]

  elif (furo & 0x0018): # 刻子、加槓の場合
    pattern = (furo & 0xFE00) >> 9
    pattern = int(pattern / 3)
    color_id = int(pattern / 9)
    number = pattern % 9 + 1
    soezi = [0, 1, 2, 3]

    if (furo & 0x0008): # 刻子
      mentsu_num = [number, number, number]
      soezi_exclude = (furo & 0x0060) >> 5
      soezi.remove(soezi_exclude)
      mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]

    else: # 加槓
      mentsu_num = [number, number, number, number]
      mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]

  else: # 暗槓、大明槓の場合
    pattern = (furo & 0xFF00) >> 8
    pattern = int(pattern / 4)
    color_id = int(pattern / 9)
    number = pattern % 9 + 1
    soezi = [0, 1, 2, 3]
    mentsu_num = [number, number, number, number]

    if (furo & 0x0003): # 大明槓
      mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]
    else: # 暗槓
      mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]

  mentsu_str = [str(i) for i in mentsu]
  return mentsu_str
  1. 副露情報の整数変換:

    • furo = int(furo):
      • 副露情報を整数に変換します。
  2. 順子の場合の処理:

    • if (furo & 0x0004)::
      • 順子の場合の条件分岐。
    • pattern = (furo & 0xFC00) >> 10:
      • パターンを抽出し、右シフトして処理します。
    • color_id = int(pattern / 7):
      • 色IDを計算します。
    • number = pattern % 7 + 1:
      • 数字を計算します。
    • mentsu_num = [number, number + 1, number + 2]:
      • 面子の数字を決定します。
    • soezi = [(furo & 0x0018) >> 3, (furo & 0x0060) >> 5, (furo & 0x0180) >> 7]:
      • 添字を抽出します。
    • mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]:
      • 牌IDを計算します。
  3. 刻子、加槓の場合の処理:

    • elif (furo & 0x0018)::

      • 刻子または加槓の場合の条件分岐。
    • pattern = (furo & 0xFE00) >> 9:

      • パターンを抽出し、右シフトして処理します。
    • color_id = int(pattern / 9):

      • 色IDを計算します。
    • number = pattern % 9 + 1:

      • 数字を計算します。
    • soezi = [0, 1, 2, 3]:

      • 添字をリストに格納します。
    • 刻子の場合の処理:

      • if (furo & 0x0008)::
        • 刻子の場合の条件分岐。
      • mentsu_num = [number, number, number]:
        • 面子の数字を決定します。
      • soezi_exclude = (furo & 0x0060) >> 5:
        • 除外する添字を計算します。
      • soezi.remove(soezi_exclude):
        • 除外する添字をリストから削除します。
      • mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]:
        • 牌IDを計算します。
    • 加槓の場合の処理:

      • else::
        • 加槓の場合の条件分岐。
      • mentsu_num = [number, number, number, number]:
        • 面子の数字を決定します。
      • mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]:
        • 牌IDを計算します。
  4. 暗槓、大明槓の場合の処理:

    • else::

      • 暗槓または大明槓の場合の条件分岐。
    • pattern = (furo & 0xFF00) >> 8:

      • パターンを抽出し、右シフトして処理します。
    • color_id = int(pattern / 9):

      • 色IDを計算します。
    • number = pattern % 9 + 1:

      • 数字を計算します。
    • soezi = [0, 1, 2, 3]:

      • 添字をリストに格納します。
    • mentsu_num = [number, number, number, number]:

      • 面子の数字を決定します。
    • 大明槓の場合の処理:

      • if (furo & 0x0003)::
        • 大明槓の場合の条件分岐。
      • mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]:
        • 牌IDを計算します。
    • 暗槓の場合の処理:

      • else::
        • 暗槓の場合の条件分岐。
      • mentsu = [color_id * 9 * 4 + (i - 1) * 4 + j for i, j in zip(mentsu_num, soezi)]:
        • 牌IDを計算します。
  5. 結果の文字列リスト化:

    • mentsu_str = [str(i) for i in mentsu]:
      • 牌IDを文字列に変換してリスト化します。
    • return mentsu_str:
      • 結果のリストを返します。

和了データフレームの前処理と特徴量作成

和了データフレームの前処理を行い、各種特徴量を抽出および変換します。これにより、モデル学習に適した形式のデータを準備します。

agari_df["yaku"] = agari_df.yaku.astype('str')
agari_df["yakuman"] = agari_df.yakuman.astype('str')

agari_df["hai"] = agari_df.hai.apply(lambda x: x.split(","))

agari_df["menzen_int"] = agari_df.hai.apply(lambda x: [int(i) for i in x])
agari_df["menzen_str"] = agari_df.hai.apply(lambda x: [hai_convert_str(i) for i in x])

agari_df["furo_int"] = agari_df.m.apply(lambda x: [int(i) for i in naki_treatment(x)[0]])
agari_df["furo_str"] = agari_df.m.apply(lambda x: naki_treatment(x)[1])

agari_df["yaku_all"] = agari_df[["yaku", "yakuman"]].apply(lambda x: yaku_append(x)[0], axis=1)
agari_df["yaku_label"] = agari_df.yaku_all.apply(lambda x: yaku_label(x)[0]) # 役(対象のみ)のone-hot
agari_df["yaku_label_str"] = agari_df.yaku_all.apply(lambda x: yaku_label(x)[1]) # 役(対象のみ)
agari_df["yaku_label_num"] = agari_df.yaku_label_str.apply(len) # 役の数

agari_df["tehai_int"] = agari_df[["menzen_int", "furo_int"]].apply(lambda x: sorted([int(i) for i in x[1]+x[0]]), axis=1)
agari_df["tehai_str"] = agari_df.tehai_int.apply(lambda x: [hai_convert_str(i) for i in x])
agari_df["tehai_c_int"] = agari_df.tehai_int.apply(lambda x: sorted([id_int >> 2 for id_int in x]))
agari_df["tehai_im"] = agari_df.tehai_c_int.apply(lambda x: id_convert_image(x))
  1. 役情報の文字列変換:
    • agari_df["yaku"] = agari_df.yaku.astype('str'):
      • 役情報を文字列型に変換。
    • agari_df["yakuman"] = agari_df.yakuman.astype('str'):
      • 役満情報を文字列型に変換。

 

  1. 牌情報の分割:
    • agari_df["hai"] = agari_df.hai.apply(lambda x: x.split(",")):
      • 牌情報をカンマで分割しリストに変換。

 

  1. 面前牌の整数変換と文字列変換:
    • agari_df["menzen_int"] = agari_df.hai.apply(lambda x: [int(i) for i in x]):
      • 面前牌を整数に変換。
    • agari_df["menzen_str"] = agari_df.hai.apply(lambda x: [hai_convert_str(i) for i in x]):
      • 面前牌を文字列に変換。

 

  1. 副露牌の整数変換と文字列変換:
    • agari_df["furo_int"] = agari_df.m.apply(lambda x: [int(i) for i in naki_treatment(x)[0]]):
      • 副露牌を整数に変換。
    • agari_df["furo_str"] = agari_df.m.apply(lambda x: naki_treatment(x)[1]):
      • 副露牌を文字列に変換。

 

  1. 役情報の統合とラベル化:
    • agari_df["yaku_all"] = agari_df[["yaku", "yakuman"]].apply(lambda x: yaku_append(x)[0], axis=1):
      • 役と役満情報を統合してリスト化。
    • agari_df["yaku_label"] = agari_df.yaku_all.apply(lambda x: yaku_label(x)[0]):
      • 役情報をone-hotエンコード。
    • agari_df["yaku_label_str"] = agari_df.yaku_all.apply(lambda x: yaku_label(x)[1]):
      • 役情報の文字列表現。
    • agari_df["yaku_label_num"] = agari_df.yaku_label_str.apply(len):
      • 役の数を計算。

 

  1. 手牌の統合と画像変換:
    • agari_df["tehai_int"] = agari_df[["menzen_int", "furo_int"]].apply(lambda x: sorted([int(i) for i in x[1]+x[0]]), axis=1):
      • 面前牌と副露牌を統合してソート。
    • agari_df["tehai_str"] = agari_df.tehai_int.apply(lambda x: [hai_convert_str(i) for i in x]):
      • 手牌を文字列に変換。
    • agari_df["tehai_c_int"] = agari_df.tehai_int.apply(lambda x: sorted([id_int >> 2 for id_int in x])):
      • 牌IDをシフトしてソート。
    • agari_df["tehai_im"] = agari_df.tehai_c_int.apply(lambda x: id_convert_image(x)):
      • 手牌を画像表現に変換。

 

和了データフレームの先頭1行の表示

和了データフレームの先頭1行が表示されます。これは、前処理と特徴量作成が正しく行われたかを確認するために使用されます。

agari_df.head(1)

以下に、表示されるデータの例を示します:

ba hai m machi ten yaku yakuman menzen_int menzen_str furo_int furo_str yaku_all yaku_label yaku_label_str yaku_label_num tehai_int tehai_str tehai_c_int tehai_im
0 1,2 [11, 12, 13, 14, 15, 16, 17, 18, 19] 9 13 30,8000,1 1,1,0,1,52,3,53,0 [11, 12, 13, 14, 15, 16, 17, 18, 19] ['一', '二', '三', '四', '五', '六', '七', '八', '九'] [] [] [1, 1, 0, 1, 52, 3, 53, 0] [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ['役牌 白', '立直', '門前清自摸和'] 3 [11, 12, 13, 14, 15, 16, 17, 18, 19] ['一', '二', '三', '四', '五', '六', '七', '八', '九'] [2, 3, 4, 5, 6, 7, 8, 9, 10] ['🀄', '🀅', '🀆', '🀇', '🀈', '🀉', '🀊', '🀋', '🀌']

上記の表は、各列がどのような情報を持っているかを示しています。データフレームの列についての詳細な説明は以下の通りです:

  • ba: 場の情報
  • hai: 牌の情報(リスト形式)
  • m: 副露の情報
  • machi: 待ち牌の情報
  • ten: 点数の情報
  • yaku: 役の情報
  • yakuman: 役満の情報
  • menzen_int: 面前牌の整数表現
  • menzen_str: 面前牌の文字列表現
  • furo_int: 副露牌の整数表現
  • furo_str: 副露牌の文字列表現
  • yaku_all: すべての役情報
  • yaku_label: 役情報のone-hotエンコード
  • yaku_label_str: 役情報の文字列表現
  • yaku_label_num: 役の数
  • tehai_int: 手牌の整数表現
  • tehai_str: 手牌の文字列表現
  • tehai_c_int: 牌IDをシフトした整数表現
  • tehai_im: 手牌の画像表現

不要なデータの除去とデータフレームのリセット

役のない和了情報を除去し、整理されたデータフレームを得ることができます。

agari_df = agari_df[agari_df.yaku_label_num != 0].reset_index(drop=True)
len(agari_df)
  1. 不要なデータの除去:

    • agari_df = agari_df[agari_df.yaku_label_num != 0].reset_index(drop=True):
      • 役の数が0でない行のみをフィルタリングし、データフレームを更新します。また、データフレームのインデックスをリセットします。
  2. データフレームの長さ確認:

    • len(agari_df):
      • フィルタリング後のデータフレームの行数を確認します。

和了データフレームの特定列の先頭5行の表示

和了データフレームの特定の列の先頭5行が表示されます。

agari_df[["yaku_label", "yaku_label_str", "tehai_str", "tehai_c_int", "tehai_im"]].head(5)

各列の説明

  • yaku_label:

    • 役情報のone-hotエンコード。
  • yaku_label_str:

    • 役情報の文字列表現。
  • tehai_str:

    • 手牌の文字列表現。
  • tehai_c_int:

    • 手牌のIDをシフトした整数表現。
  • tehai_im:

    • 手牌の画像表現。

以下に表示されるデータの例を示します:

この表示により、和了データの具体的な内容を確認することができます。

データフレームの行数とユニークな手牌の数の確認

データセットに含まれる和了情報の総数と、データの中にどれだけ違った形の手牌の組み合わせの数があるかわかります。

print(len(agari_df))
print(agari_df.tehai_c_int.apply(lambda x: str(x)).nunique())
  1. データフレームの行数の確認:

    • print(len(agari_df)):
      • フィルタリング後のデータフレームの行数を出力します。
  2. ユニークな手牌の数の確認:

    • print(agari_df.tehai_c_int.apply(lambda x: str(x)).nunique()):
      • 各手牌(tehai_c_int)を文字列に変換し、特定の手牌の数を出力します。

実行結果の例:

yaku_label yaku_label_str tehai_str tehai_c_int tehai_im
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ['役牌 白'] ['②', '③', '④', '④', '④', '⑤', '⑤', '⑤', '5', '5', '5', '白', '白', '白'] [10, 11, 12, 12, 12, 13, 13, 13, 22, 22, 22, 31, 31, 31] 🀚🀛🀜🀜🀜🀝🀝🀝🀔🀔🀔🀆🀆🀆
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ['断幺九'] ['三', '三', '四', '五', '六', '②', '②', '③', '③', '④', '④', '3', '4', '5'] [2, 2, 3, 4, 5, 10, 10, 11, 11, 12, 12, 20, 21, 22] 🀉🀉🀊🀋🀌🀚🀚🀛🀛🀜🀜🀒🀓🀔
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ['役牌 中'] ['二', '三', '四', '九', '九', '5', '6', '7', '東', '東', '東', '中', '中', '中'] [1, 2, 3, 8, 8, 22, 23, 24, 27, 27, 27, 33, 33, 33] 🀈🀉🀊🀏🀏🀔🀕🀖🀀🀀🀀🀄🀄🀄
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ['役牌 白'] ['四', '五', '六', '⑤', '⑤', '⑤', '⑨', '⑨', '3', '4', '5', '白', '白', '白'] [3, 4, 5, 13, 13, 13, 17, 17, 20, 21, 22, 31, 31, 31] 🀊🀋🀌🀝🀝🀝🀡🀡🀒🀓🀔🀆🀆🀆
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] ['役牌 白'] ['六', '七', '八', '①', '②', '③', '④', '④', '6', '7', '8', '白', '白', '白'] [5, 6, 7, 9, 10, 11, 12, 12, 23, 24, 25, 31, 31, 31] 🀌🀍🀎🀙🀚🀛🀜🀜🀕🀖🀗🀆🀆🀆
190657 # agari_dfの行数
148864 # agari_dfの行数

PyTorchデータセットクラスの定義

このデータセットクラスを使うことで、PyTorchのデータローダに渡して効率的にデータをバッチ処理することができます。

class MyDataset(torch.utils.data.Dataset):
  def __init__(self, agari_df):
    self.tehai_list = agari_df["tehai_c_int"].tolist()
    self.yaku_label_list = agari_df["yaku_label"].tolist()

  def __len__(self):
    return len(self.tehai_list)

  def __getitem__(self, index):
    tehai, yaku_label = self.pull_item(index)
    return tehai, yaku_label

  def tehai_2d(self, tehai):
    c = collections.Counter(tehai)
    tehai_ar = np.zeros(4*9*4)
    for i in c.items():
      hai_id = i[0] # 牌の種類 0index
      hai_times = i[1] # 枚数
      hai_id_list = [hai_id*4+i for i in range(hai_times)]
      for j in hai_id_list:
        tehai_ar[j] = 1
    tehai_m = tehai_ar[0:9*1*4].copy().reshape([-1, 4])
    tehai_p = tehai_ar[9*1*4:9*2*4].copy().reshape([-1, 4])
    tehai_s = tehai_ar[9*2*4:9*3*4].copy().reshape([-1, 4])
    tehai_z = tehai_ar[9*3*4:].copy().reshape([-1, 4])
    tehai_ar = np.concatenate([tehai_m, tehai_p, tehai_s, tehai_z], 1)
    return tehai_ar

  def pull_item(self, index):
    tehai = self.tehai_list[index]
    yaku_label = self.yaku_label_list[index]
    tehai_all = torch.tensor(np.array([self.tehai_2d(tehai)]),
                             dtype=torch.float32, device=DEVICE)
    yaku_label = torch.tensor(np.array(yaku_label), dtype=torch.float32, device=DEVICE)

    return tehai_all, yaku_label
  1. データセットクラスの定義:

    • class MyDataset(torch.utils.data.Dataset):
      • PyTorchのDatasetクラスを継承してカスタムデータセットクラスを定義。
  2. 初期化メソッド:

    • def __init__(self, agari_df):
      • データフレームから手牌リストと役ラベルリストを取得して初期化。
  3. データセットの長さ:

    • def __len__(self):
      • データセットの長さ(手牌の数)を返す。
  4. データ取得メソッド:

    • def __getitem__(self, index):
      • 指定されたインデックスに対応する手牌と役ラベルを返す。
  5. 手牌を2次元配列に変換:

    • def tehai_2d(self, tehai):
      • 手牌を2次元配列に変換する。
      • 牌の種類ごとにカウントし、適切な位置に1を配置。
  6. アイテム取得メソッド:

    • def pull_item(self, index):
      • 指定されたインデックスに対応する手牌と役ラベルをテンソル形式で返す。

データセットの分割とPyTorchデータセットの作成

元のデータフレームを訓練、検証、およびテストの3つのデータセットに分割し、それぞれPyTorchのデータセットとして使用できるように準備します。これらのデータセットは、モデルの学習、検証、およびテストに利用されます。

BATCH_SIZE = 1024
TRAIN_SIZE = int(len(agari_df) // (BATCH_SIZE * 10) * (BATCH_SIZE * 10) * 0.6)
VALID_SIZE = int(len(agari_df) // (BATCH_SIZE * 10) * (BATCH_SIZE * 10) * 0.3)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

df_train, df_val = train_test_split(agari_df, train_size=TRAIN_SIZE)
df_val, df_test = train_test_split(df_val, train_size=VALID_SIZE)

train_dataset = MyDataset(df_train)
val_dataset = MyDataset(df_val)
test_dataset = MyDataset(df_test)
  1. バッチサイズの設定:

    • BATCH_SIZE = 1024:
      • データを処理する際の一度に扱うデータ数を設定します。
  2. データセットのサイズの計算:

    • TRAIN_SIZE:
      • 訓練データのサイズを計算します。データセットの60%を訓練用に設定します。
    • VALID_SIZE:
      • 検証データのサイズを計算します。データセットの30%を検証用に設定します。
  3. デバイスの設定:

    • DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu'):
      • GPUが利用可能であればGPUを、そうでなければCPUを使用します。
  4. データセットの分割:

    • df_train, df_val = train_test_split(agari_df, train_size=TRAIN_SIZE):
      • データフレームを訓練データと検証データに分割します。
    • df_val, df_test = train_test_split(df_val, train_size=VALID_SIZE):
      • 検証データをさらに検証データとテストデータに分割します。
  5. PyTorchデータセットの作成:

    • train_dataset = MyDataset(df_train):
      • 訓練データ用のデータセットを作成します。
    • val_dataset = MyDataset(df_val):
      • 検証データ用のデータセットを作成します。
    • test_dataset = MyDataset(df_test):
      • テストデータ用のデータセットを作成します。

データローダーの作成

データローダーの役割

データローダーは、データセットからデータをバッチ単位で効率的に読み込むために使用されます。データローダーを使用することで、以下のメリットがあります:

  • バッチ処理:

    • 一度に指定した数のデータを読み込むことで、メモリの効率的な使用と計算の高速化を図ります。
  • シャッフル:

    • 訓練データをランダムにシャッフルすることで、モデルがデータの順序に依存することを防ぎます。
  • データセットの分割:

    • 訓練、検証、およびテスト用のデータセットをそれぞれのデータローダーに分けて使用することで、異なるフェーズでのデータ管理が簡単になります。

これにより、モデルの訓練、検証、およびテストの各フェーズでデータを効率的に読み込むことができます。

train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(
    val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_dataloader = torch.utils.data.DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False)
  1. 訓練データローダーの作成:

    • train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True):
      • 訓練データセットを使用してデータローダーを作成します。
      • batch_sizeは一度に読み込むデータの数を設定し、ここではBATCH_SIZE(1024)を使用。
      • shuffle=Trueはデータをランダムにシャッフルすることを意味し、訓練データに対して一般的に使用されます。
  2. 検証データローダーの作成:

    • val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False):
      • 検証データセットを使用してデータローダーを作成します。
      • shuffle=Falseはデータをシャッフルしないことを意味し、検証データやテストデータでは一般的です。
  3. テストデータローダーの作成:

    • test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False):
      • テストデータセットを使用してデータローダーを作成します。
      • shuffle=Falseはデータをシャッフルしないことを意味します。

麻雀AIのトレーニングスクリプト

麻雀AIモデルの訓練、検証、およびテストを行うための準備を行います。

定数の定義とデバイスの設定

麻雀AIの学習に必要な定数を定義し、使用する計算デバイス(GPUまたはCPU)を設定します。

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np

# 定数の定義
WEIGHT_DECAY = 0.005  # 重み減衰率(L2正則化)を設定。過学習を防ぐために使用。
LEARNING_RATE = 0.01  # 学習率を設定。モデルの重みの更新幅を決定。
EPOCH = 100  # エポック数を設定。データセット全体を何回繰り返して学習するかを示す。
LABEL_NUM = len(agari_df.yaku_label[0])  # ラベルの数(役の数)を設定。agari_dfが事前に定義されていることが前提。

# デバイスの設定
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # GPUが利用可能であればGPUを、そうでなければCPUを使用するように設定。
  1. ライブラリのインポート:

    • import torch:
      • PyTorchライブラリの基本機能をインポート。
    • import torch.nn as nn:
      • PyTorchのニューラルネットワークモジュールをインポート。
    • import torch.optim as optim:
      • PyTorchの最適化アルゴリズムモジュールをインポート。
    • import torchvision.transforms as transforms:
      • PyTorchの画像変換モジュールをインポート。
    • import numpy as np:
      • 数値計算を効率的に行うためのNumPyライブラリをインポート。
  2. 定数の定義:

    • WEIGHT_DECAY = 0.005:
      • 重み減衰率(L2正則化)を設定。過学習を防ぐために使用。
    • LEARNING_RATE = 0.01:
      • 学習率を設定。モデルの重みの更新幅を決定。
    • EPOCH = 100:
      • エポック数を設定。データセット全体を何回繰り返して学習するかを示す。
    • LABEL_NUM = len(agari_df.yaku_label[0]):
      • ラベルの数(役の数)を設定。agari_dfが事前に定義されていることが前提。
  3. デバイスの設定:

    • DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu"):
      • GPUが利用可能であればGPUを、そうでなければCPUを使用するように設定。モデルの学習と推論の高速化のためにGPUを使用するのが一般的。

精度を計算する関数

測されたラベルと実際のラベルを比較し、予測精度を計算します。

def pred_acc(original, predicted):
    label_predicted = (predicted > 0).int()
    return label_predicted.eq(original).sum().item() / len(original)
  • def pred_acc(original, predicted)::

    • original(実際のラベル)とpredicted(予測されたラベル)を引数に取る関数を定義します。
  • label_predicted = (predicted > 0).int():

    • 予測された値をバイナリ化し、整数型に変換します。
    • predicted > 0:予測値が0より大きければ1、そうでなければ0に変換します。
    • .int():バイナリ化された値を整数型に変換します。
  • return label_predicted.eq(original).sum().item() / len(original):

    • 正解率を計算して返します。
    • label_predicted.eq(original):予測ラベルと実際のラベルを比較し、一致する要素はTrue(1)、一致しない要素はFalse(0)とします。
    • .sum().item():Trueの数(正解数)を合計します。
    • / len(original):正解数を全体の数で割って、正解率(精度)を計算します。

モデルのインスタンス化とオプティマイザの定義

ここはわかりずらいの塊ごとにいきます。

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np

# 定数の定義
WEIGHT_DECAY = 0.005
LEARNING_RATE = 0.01
EPOCH = 100
LABEL_NUM = len(agari_df.yaku_label[0])  # agari_dfが定義されていることを前提とします

# デバイスの設定
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ネットワークの定義
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool2d(2, stride=2)
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=2, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=2, padding=1)
        self.fc1 = nn.Linear(6336, 120)
        self.fc2 = nn.Linear(120, LABEL_NUM)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = x.view(x.size()[0], -1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# 精度を計算する関数
def pred_acc(original, predicted):
    label_predicted = (predicted > 0).int()
    return label_predicted.eq(original).sum().item() / len(original)

# モデルのインスタンス化とデバイスへの移動
net = Net()
net = net.to(DEVICE)

# 損失関数とオプティマイザの定義
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(net.parameters(), lr=LEARNING_RATE, momentum=0.9, weight_decay=WEIGHT_DECAY)

# データローダー (train_dataloader と val_dataloader) が定義されていることを前提とします
train_loss_value = []
train_acc_value = []
test_loss_value = []
test_acc_value = []

# トレーニングループ
for epoch in range(EPOCH):
    print('epoch', epoch + 1)  # epoch数の出力

    net.train()
    for inputs, labels in train_dataloader:
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    running_loss = []
    running_acc = []

    net.eval()
    with torch.no_grad():
        for inputs, labels in train_dataloader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            outputs = net(inputs)
            acc_ = [pred_acc(torch.Tensor.cpu(labels[j]), torch.Tensor.cpu(d)) for j, d in enumerate(outputs)]
            loss = criterion(outputs, labels)
            running_loss.append(loss.item())
            running_acc.append(np.mean(acc_))

    total_batch_loss = np.mean(running_loss)
    total_batch_acc = np.mean(running_acc)

    print("train mean loss={}, accuracy={}".format(total_batch_loss, total_batch_acc))
    train_loss_value.append(total_batch_loss)
    train_acc_value.append(total_batch_acc)

    running_loss_test = []
    running_acc_test = []

    with torch.no_grad():
        for inputs, labels in val_dataloader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            outputs = net(inputs)
            acc_ = [pred_acc(torch.Tensor.cpu(labels[j]), torch.Tensor.cpu(d)) for j, d in enumerate(outputs)]
            loss = criterion(outputs, labels)
            running_loss_test.append(loss.item())
            running_acc_test.append(np.mean(acc_))

    total_batch_loss = np.mean(running_loss_test)
    total_batch_acc = np.mean(running_acc_test)

    print("test mean loss={}, accuracy={}".format(total_batch_loss, total_batch_acc))
    test_loss_value.append(total_batch_loss)
    test_acc_value.append(total_batch_acc)

やっていることは

1.ライブラリのインポート

  • 必要なライブラリをインポート。

2.定数の定義

  • 重み減衰率 (WEIGHT_DECAY = 0.005)
  • 学習率 (LEARNING_RATE = 0.01)
  • エポック数 (EPOCH = 100)
  • ラベル数 (LABEL_NUM = len(agari_df.yaku_label[0]))

3.デバイスの設定

  • GPUまたはCPUの選択。

4.ネットワークの定義

  • 畳み込み層と全結合層を持つニューラルネットワーク。

5.精度を計算する関数

  • 予測精度を計算。

6.モデルのインスタンス化とデバイスへの移動

  • モデルを作成し、デバイスに移動。

7.損失関数とオプティマイザの定義

  • バイナリクロスエントロピー損失関数とSGDオプティマイザ。

8.データローダーの準備

  • 損失と精度を格納するリストを初期化。

9.トレーニングループ

  • エポックごとに訓練と評価を実行。

10.訓練フェーズ

  • データを使用してモデルを訓練。

11.訓練データでの評価

  • 訓練データでモデルを評価。

12.テストデータでの評価

  • テストデータでモデルを評価。

です。以下に詳しく書いておきます。

詳細な解説

ライブラリのインポート

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np
  • import torch: PyTorchライブラリをインポート。
  • import torch.nn as nn: ニューラルネットワークモジュールをインポート。
  • import torch.optim as optim: オプティマイザモジュールをインポート。
  • import torchvision.transforms as transforms: 画像変換モジュールをインポート。
  • import numpy as np: 数値計算ライブラリNumPyをインポート。

定数の定義

WEIGHT_DECAY = 0.005
LEARNING_RATE = 0.01
EPOCH = 100
LABEL_NUM = len(agari_df.yaku_label[0])  # agari_dfが定義されていることを前提とします
  • WEIGHT_DECAY = 0.005: 重み減衰率(L2正則化)を設定。過学習を防ぐために使用。
  • LEARNING_RATE = 0.01: 学習率を設定。モデルのパラメータ更新のステップサイズ。
  • EPOCH = 100: エポック数を設定。全データセットに対する訓練の繰り返し回数。
  • LABEL_NUM = len(agari_df.yaku_label[0]): ラベルの数を設定。データフレームagari_dfからラベルの長さを取得。

デバイスの設定

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  • DEVICE: GPUが利用可能ならGPUを、そうでなければCPUを使用するデバイスを設定。

ネットワークの定義

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool2d(2, stride=2)
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=2, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=2, padding=1)
        self.fc1 = nn.Linear(6336, 120)
        self.fc2 = nn.Linear(120, LABEL_NUM)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = x.view(x.size()[0], -1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x
  • __init__:

    • super(Net, self).__init__(): 親クラスの__init__メソッドを呼び出し。
    • self.relu = nn.ReLU(): ReLU活性化関数を定義。
    • self.pool = nn.MaxPool2d(2, stride=2): プーリング層を定義。
    • self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=2, padding=1): 畳み込み層1を定義。
    • self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=2, padding=1): 畳み込み層2を定義。
    • self.fc1 = nn.Linear(6336, 120): 全結合層1を定義。
    • self.fc2 = nn.Linear(120, LABEL_NUM): 全結合層2を定義。
  • forward:

    • x = self.conv1(x): 入力xを畳み込み層1に通す。
    • x = self.relu(x): ReLU活性化関数を適用。
    • x = self.conv2(x): 畳み込み層2に通す。
    • x = self.relu(x): ReLU活性化関数を適用。
    • x = x.view(x.size()[0], -1): テンソルをフラット化。
    • x = self.fc1(x): 全結合層1に通す。
    • x = self.relu(x): ReLU活性化関数を適用。
    • x = self.fc2(x): 全結合層2に通す。
    • return x: 出力を返す。

精度を計算する関数

def pred_acc(original, predicted):
    label_predicted = (predicted > 0).int()
    return label_predicted.eq(original).sum().item() / len(original)
  • def pred_acc(original, predicted):: 精度を計算する関数を定義。
    • label_predicted = (predicted > 0).int(): 予測された値をバイナリ化して整数型に変換。
    • return label_predicted.eq(original).sum().item() / len(original): 正解率を計算して返す。

モデルのインスタンス化とデバイスへの移動

net = Net()
net = net.to(DEVICE)
  • net = Net(): Netクラスのインスタンスを作成。
  • net = net.to(DEVICE): モデルをデバイス(GPUまたはCPU)に移動。

損失関数とオプティマイザの定義

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(net.parameters(), lr=LEARNING_RATE, momentum=0.9, weight_decay=WEIGHT_DECAY)
  • criterion = nn.BCEWithLogitsLoss(): バイナリクロスエントロピー損失関数を定義。
  • optimizer = optim.SGD(net.parameters(), lr=LEARNING_RATE, momentum=0.9, weight_decay=WEIGHT_DECAY): 確率的勾配降下法(SGD)オプティマイザを定義し、学習率、モーメンタム、重み減衰率を設定。

データローダーの準備

train_loss_value = []
train_acc_value = []
test_loss_value = []
test_acc_value = []
  • train_loss_value = []: 訓練データの損失値を格納するリストを初期化。
  • train_acc_value = []: 訓練データの精度を格納するリストを初期化。
  • test_loss_value = []: テストデータの損失値を格納するリストを初期化。
  • test_acc_value = []: テストデータの精度を格納するリストを初期化。

トレーニングループ

for epoch in range(EPOCH):
    print('epoch', epoch + 1)
  • for epoch in range(EPOCH):: エポック数のループを開始。
  • print('epoch', epoch + 1): 現在のエポック数を出力。

訓練フェーズ

net.train()
for inputs, labels in train_dataloader:
    inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
    optimizer.zero_grad()
    outputs = net(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
  • net.train(): モデルを訓練モードに設定。
  • for inputs, labels in train_dataloader:: 訓練データローダーからデータを取得。
  • inputs, labels = inputs.to(DEVICE), labels.to(DEVICE): データをデバイスに移動。
  • optimizer.zero_grad(): 勾配を初期化。
  • outputs = net(inputs): 入力データをモデルに通して出力を取得。
  • loss = criterion(outputs, labels): 損失を計算。
  • loss.backward(): 逆伝播で勾配を計算。
  • optimizer.step(): オプティマイザでパラメータを更新。

訓練データでの評価フェーズ

running_loss = []
running_acc = []

net.eval()
with torch.no_grad():
    for inputs, labels in train_dataloader:


        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        outputs = net(inputs)
        acc_ = [pred_acc(torch.Tensor.cpu(labels[j]), torch.Tensor.cpu(d)) for j, d in enumerate(outputs)]
        loss = criterion(outputs, labels)
        running_loss.append(loss.item())
        running_acc.append(np.mean(acc_))

total_batch_loss = np.mean(running_loss)
total_batch_acc = np.mean(running_acc)

print("train mean loss={}, accuracy={}".format(total_batch_loss, total_batch_acc))
train_loss_value.append(total_batch_loss)
train_acc_value.append(total_batch_acc)
  • running_loss = []: 損失値を格納するリストを初期化。
  • running_acc = []: 精度を格納するリストを初期化。
  • net.eval(): モデルを評価モードに設定。
  • with torch.no_grad():: 勾配計算を無効にして評価を実施。
  • for inputs, labels in train_dataloader:: 訓練データローダーからデータを取得。
  • inputs, labels = inputs.to(DEVICE), labels.to(DEVICE): データをデバイスに移動。
  • outputs = net(inputs): 入力データをモデルに通して出力を取得。
  • acc_ = [pred_acc(torch.Tensor.cpu(labels[j]), torch.Tensor.cpu(d)) for j, d in enumerate(outputs)]: 精度を計算。
  • loss = criterion(outputs, labels): 損失を計算。
  • running_loss.append(loss.item()): 損失値をリストに追加。
  • running_acc.append(np.mean(acc_)): 精度をリストに追加。
  • total_batch_loss = np.mean(running_loss): 損失値の平均を計算。
  • total_batch_acc = np.mean(running_acc): 精度の平均を計算。
  • print("train mean loss={}, accuracy={}".format(total_batch_loss, total_batch_acc)): 損失値と精度を出力。
  • train_loss_value.append(total_batch_loss): 損失値をリストに追加。
  • train_acc_value.append(total_batch_acc): 精度をリストに追加。

テストデータでの評価フェーズ

running_loss_test = []
running_acc_test = []

with torch.no_grad():
    for inputs, labels in val_dataloader:
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        outputs = net(inputs)
        acc_ = [pred_acc(torch.Tensor.cpu(labels[j]), torch.Tensor.cpu(d)) for j, d in enumerate(outputs)]
        loss = criterion(outputs, labels)
        running_loss_test.append(loss.item())
        running_acc_test.append(np.mean(acc_))

total_batch_loss = np.mean(running_loss_test)
total_batch_acc = np.mean(running_acc_test)

print("test mean loss={}, accuracy={}".format(total_batch_loss, total_batch_acc))
test_loss_value.append(total_batch_loss)
test_acc_value.append(total_batch_acc)
  • running_loss_test = []: テストデータの損失値を格納するリストを初期化。
  • running_acc_test = []: テストデータの精度を格納するリストを初期化。
  • with torch.no_grad():: 勾配計算を無効にして評価を実施。
  • for inputs, labels in val_dataloader:: 検証データローダーからデータを取得。
  • inputs, labels = inputs.to(DEVICE), labels.to(DEVICE): データをデバイスに移動。
  • outputs = net(inputs): 入力データをモデルに通して出力を取得。
  • acc_ = [pred_acc(torch.Tensor.cpu(labels[j]), torch.Tensor.cpu(d)) for j, d in enumerate(outputs)]: 精度を計算。
  • loss = criterion(outputs, labels): 損失を計算。
  • running_loss_test.append(loss.item()): 損失値をリストに追加。
  • running_acc_test.append(np.mean(acc_)): 精度をリストに追加。
  • total_batch_loss = np.mean(running_loss_test): 損失値の平均を計算。
  • total_batch_acc = np.mean(running_acc_test): 精度の平均を計算。
  • print("test mean loss={}, accuracy={}".format(total_batch_loss, total_batch_acc)): 損失値と精度を出力。
  • test_loss_value.append(total_batch_loss): 損失値をリストに追加。
  • test_acc_value.append(total_batch_acc): 精度をリストに追加。

トレーニング結果の可視化

トレーニングとテストの損失および精度の変化を視覚的に確認でき、モデルの学習状況を評価します。
トレーニングの損失(LOSS)と精度(ACCURACY)の変化を可視化するためにグラフを描画します。

fig = plt.figure(figsize=(10.0, 6.0))
ax1 = fig.add_subplot(1, 2, 1)
ax2 = fig.add_subplot(1, 2, 2)

ax1.plot(range(EPOCH), train_loss_value)
ax1.plot(range(EPOCH), test_loss_value, c='#00ff00')
ax1.set_xlabel('EPOCH')
ax1.set_ylabel('LOSS')
ax1.legend(['train loss', 'test loss'])
ax1.set_title('loss')

ax2.plot(range(EPOCH), train_acc_value)
ax2.plot(range(EPOCH), test_acc_value, c='#00ff00')
ax2.set_xlabel('EPOCH')
ax2.set_ylabel('ACCURACY')
ax2.legend(['train acc', 'test acc'])
ax2.set_title('accuracy')

plt.show()

スクリプトの詳細

  1. フィギュアの作成:

    • fig = plt.figure(figsize=(10.0, 6.0)):
      • 図のサイズを設定し、フィギュアを作成します。
  2. サブプロットの作成:

    • ax1 = fig.add_subplot(1, 2, 1):
      • 1行2列のレイアウトの1番目のサブプロットを作成します。
    • ax2 = fig.add_subplot(1, 2, 2):
      • 1行2列のレイアウトの2番目のサブプロットを作成します。
  3. 損失グラフの描画:

    • ax1.plot(range(EPOCH), train_loss_value):
      • トレーニング損失をプロットします。
    • ax1.plot(range(EPOCH), test_loss_value, c='#00ff00'):
      • テスト損失をプロットします。
    • ax1.set_xlabel('EPOCH'):
      • X軸のラベルを設定します。
    • ax1.set_ylabel('LOSS'):
      • Y軸のラベルを設定します。
    • ax1.legend(['train loss', 'test loss']):
      • 凡例を設定します。
    • ax1.set_title('loss'):
      • グラフのタイトルを設定します。
  4. 精度グラフの描画:

    • ax2.plot(range(EPOCH), train_acc_value):
      • トレーニング精度をプロットします。
    • ax2.plot(range(EPOCH), test_acc_value, c='#00ff00'):
      • テスト精度をプロットします。
    • ax2.set_xlabel('EPOCH'):
      • X軸のラベルを設定します。
    • ax2.set_ylabel('ACCURACY'):
      • Y軸のラベルを設定します。
    • ax2.legend(['train acc', 'test acc']):
      • 凡例を設定します。
    • ax2.set_title('accuracy'):
      • グラフのタイトルを設定します。
  5. グラフの表示:

    • plt.show():
      • 描画したグラフを表示します。

こんな感じの表が出てくるはずです!全て読み込ませなくても出てきます。ただし、このepocだけは100回回してください。

テストデータでの精度評価と予測ラベルの保存

テストデータに対するモデルの性能を評価し、予測結果を収集することができます。

predicted_label_list = []

def pred_acc_test(original, predicted):
    label_predicted = (predicted > 0).int()
    return label_predicted.eq(original).sum().numpy() / len(original), list(label_predicted.detach().numpy())

running_loss_test = []
running_acc_test = []

net.eval()
with torch.no_grad():
    for (inputs, labels) in test_dataloader:
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        outputs = net(inputs)
        acc_ = []

        for j, d in enumerate(outputs):
            acc, predicted_label = pred_acc_test(torch.Tensor.cpu(labels[j]), torch.Tensor.cpu(d))
            acc_.append(acc)
            predicted_label_list.append(predicted_label)

        loss = criterion(outputs, labels)
        running_loss_test.append(loss.item())
        running_acc_test.append(np.asarray(acc_).mean())

total_batch_loss = np.asarray(running_loss_test).mean()
total_batch_acc = np.asarray(running_acc_test).mean()

print("test mean loss={}, accuracy={}".format(total_batch_loss, total_batch_acc))
test_loss_value.append(total_batch_loss)
test_acc_value.append(total_batch_acc)
  1. 予測ラベルリストの初期化:

    • predicted_label_list = []:
      • 予測ラベルを保存するリストを初期化します。
  2. 精度計算関数:

    • def pred_acc_test(original, predicted)::
      • 予測ラベルを生成し、精度を計算する関数を定義します。
      • label_predicted = (predicted > 0).int():
        • 予測された出力をしきい値0でバイナリ化し、整数型に変換します。
      • return label_predicted.eq(original).sum().numpy() / len(original), list(label_predicted.detach().numpy()):
        • 精度と予測ラベルを返します。
  3. テストデータでの予測と評価:

    • running_loss_test = []:
      • テストデータの損失を保存するリストを初期化します。
    • running_acc_test = []:
      • テストデータの精度を保存するリストを初期化します。
  4. モデルの評価モードに設定:

    • net.eval():
      • モデルを評価モードに設定します(ドロップアウトやバッチ正規化を無効にします)。
  5. テストデータの予測と損失計算:

    • with torch.no_grad()::
      • 勾配計算を無効にして予測を行います。
    • for (inputs, labels) in test_dataloader::
      • テストデータローダーから入力とラベルを取得します。
      • inputs, labels = inputs.to(DEVICE), labels.to(DEVICE):
        • 入力とラベルをデバイス(GPUまたはCPU)に移動します。
      • outputs = net(inputs):
        • モデルに入力を与えて予測を取得します。
      • acc, predicted_label = pred_acc_test(torch.Tensor.cpu(labels[j]), torch.Tensor.cpu(d)):
        • 精度を計算し、予測ラベルを取得します。
      • predicted_label_list.append(predicted_label):
        • 予測ラベルをリストに追加します。
      • loss = criterion(outputs, labels):
        • 損失を計算します。
      • running_loss_test.append(loss.item()):
        • 計算した損失をリストに追加します。
      • running_acc_test.append(np.asarray(acc_).mean()):
        • 計算した精度をリストに追加します。
  6. 損失と精度の平均を計算:

    • total_batch_loss = np.asarray(running_loss_test).mean():
      • テストデータ全体の損失の平均を計算します。
    • total_batch_acc = np.asarray(running_acc_test).mean():
      • テストデータ全体の精度の平均を計算します。
  7. 結果の出力と保存:

    • print("test mean loss={}, accuracy={}".format(total_batch_loss, total_batch_acc)):
      • 損失と精度の平均を出力します。
    • test_loss_value.append(total_batch_loss):
      • テストデータの損失をリストに保存します。
    • test_acc_value.append(total_batch_acc):
      • テストデータの精度をリストに保存します。

test mean loss=0.1747526698990872, accuracy=0.9329816297407124となりました、二十%の状態なので、ロスが大きいですね。

最後に

こんなに麻雀AIが学習するのに時間がかかるとは思わなかった・・・。

Discussion