🙌

Airbnb賃料予想Kaggleコンペで使った技術を勉強しながらまとめる

に公開

概要

実は東京のAirbnbの賃料を予測するプライベートのKaggleコンペに参加させていただいていました。(参加は非公開のURL経由でないとできない。)
期間は3月末~5月中旬までだったんですが振り返りをしていなかったのと、生成AIを使用したコーディングのリークの危険性や(終了前日に気づいた。)処理の内容についてもまとめたいと思い記事を書こうと思います

コード全文

コードと処理の内容に関しては以下のようになっています。実業務で機械学習を使用していない(画像処理で検討時に掠るくらい)のとClaudeに頼り切ったコーディングだったので間違っているかもしれません。また開催中も色々な方からのコードの共有が多く、正直ほとんどの部分をそういった方々からの共有コードを参考に書かせていただいてます。ありがとうございます🙏
https://www.kaggle.com/code/nkforwork/12th-place-solution

コード部分の技術

1.地図上賃貸の分布の可視化

今回は東京のAirbnbの賃料の予測のためのデータとして緯度経度が与えられていました。
この分布を見るためにmatplotlibを使用して以下のようなコードで分布を確認しました。

plt.scatter(df_train['longitude'], df_train['latitude'], alpha=0.5, label='Training', color='blue')
plt.scatter(df_test['longitude'], df_test['latitude'], alpha=0.5, label='Test', color='red')

実はtestデータとtrainデータの分布は南北で分かれており、そのためそれを可視化するために上のようなコードを使用しました。これでtrainデータは青、testデータは赤に着色され分布の様子を確認することができました。

2.特徴量作成

流石に特徴量全部についてまとめると長すぎるんで印象に残ったもののみ書きたいと思います。

2-1:スーパーホストか否か

Airbnbのスーパーホストとは優良なホスト(物件を保持している大家さんのようなもの?)に与えられる称号のようなものらしいです。評価基準は(3ヶ月ごとに審査)され例えば総合評価が4.8以上(5点満点)とかキャンセル率が1%以下(ただし、やむを得ない事情は除外)のホストに与えられるみたいです。
スーパーホストになると検索結果で優先的に表示されやすくなったり色々とメリットがあるみたいです。
今回の賃料を予想するというコンペにおいてこのスーパーホストか否かという特徴量は有効なようでLightGBMに読ませるためにtとfという文字を1と0に変換しました。

result_df['host_is_superhost_binary'] = result_df['host_is_superhost'].map({'t': 1, 'f': 0})

2-2:緯度経度情報を距離に変換して主要駅からの距離の特徴量を作成

Haversine距離というもので緯度経度を距離に近似しています。
また駅情報もstation.csvファイルにあったのでそれを使用して、駅との距離を特徴量として使用しています。
例えば東京とか渋谷とかから近いなら賃料は高くなりますが、郊外であれば賃料は低くなるのでその情報を特徴量にします。単純に中心街からの距離が賃料に影響しているわけではなく場所によって賃料がばらばらなのが面白かったです。(高級住宅街とかはやっぱり高いですよね。)
以下一部抜粋

一部抜粋
# ----- 駅情報と地理的特徴量 -----
    # 緯度経度をラジアンに変換する関数
    def to_radians(df, lat_col='latitude', lon_col='longitude'):
        return np.radians(df[[lat_col, lon_col]].values)
    
    # Haversine距離を計算する関数(球面近似)
    def haversine_distance(lat1, lon1, lat2, lon2):
        earth_radius_m = 6371000  # 地球半径(メートル)
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        a = np.sin(dlat / 2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2.0)**2
        c = 2 * np.arcsin(np.sqrt(a))
        return earth_radius_m * c
    
    # 緯度経度をラジアンに変換
    hotel_coords_rad = to_radians(result_df)
    station_coords_rad = to_radians(df_station, 'Latitude', 'Longitude')
    
    # 各施設から最も近い駅を検索
    tree = cKDTree(station_coords_rad)
    distances_rad, indices = tree.query(hotel_coords_rad, k=1)
    
    # 距離と駅名を特徴量として追加
    result_df['distance_m'] = distances_rad * 6371000  # 地球半径(メートル)
    result_df['nearest_station'] = df_station.iloc[indices]['Station'].values
    result_df['nearest_station_index'] = indices
    

2-3:name列からの特徴量抽出

実はnameの列にさまざまな特徴量が含まれていました。例えば★4.87のような評価を表す特徴量やベッド数、バスルーム数などが含まれています。またバスルームに関してはsharedやprivateなどが付いている場合がありこれも賃料に影響することが多かったです。(sharedは共有の風呂になるので賃料は低い傾向?)

一部抜粋
  # nameから評価と特徴量を抽出
    # ★の後ろにある数字(例: ★4.87)を抽出する関数
    def extract_star_rating(name):
        if pd.isna(name):
            return np.nan
        match = re.search(r'★\s*([\d\.]+)', str(name))
        return float(match.group(1)) if match else np.nan
    
    # ベッド数を抽出する関数
    def extract_beds(name):
        if pd.isna(name):
            return 0
        match = re.search(r'(\d+)\s*bed', str(name).lower())
        return int(match.group(1)) if match else 0
    
    # 寝室数を抽出する関数
    def extract_bedrooms(name):
        if pd.isna(name):
            return 0
        match = re.search(r'(\d+)\s*bedroom', str(name).lower())
        return int(match.group(1)) if match else 0
    
    # バスルーム数を抽出する関数(プライバシー情報も含む)
    def extract_bathrooms_with_privacy(name):
        if pd.isna(name):
            return 0, 1.0  # デフォルト値:バスルーム数0、プライバシー係数1.0
        
        # 基本のバスルーム数
        bath_match = re.search(r'(\d+(?:\.\d+)?)\s*bath', str(name).lower())
        bath_num = float(bath_match.group(1)) if bath_match else 0
        
        # プライバシー係数計算
        privacy_factor = 1.0  # デフォルト
        
        # sharedが含まれる場合は0.75を掛ける
        if 'shared bath' in str(name).lower():
            privacy_factor = 0.75
        
        # privateが含まれる場合は1.25を掛ける(念のため)
        if 'private bath' in str(name).lower():
            privacy_factor = 1.25
        
        return bath_num, privacy_factor
    

2-3: TF-IDF→SVD

今回ユーザーのコメントが入っている列がありそこには長文のデータがありました。こういった文章の分析をするためにTF-IDFというものを使用しました。

  • TF(単語の出現頻度): その物件でよく使われる単語
  • IDF(逆文書頻度): 特定の物件だけで使われる特徴的な単語
    ということみたいです。(難しい...)この二つを組み合わせることでTF-IDFとして単語の重要度を見つけることができます。
    例えばレストランのレビューで
店A: "美味しい パスタ 美味しい 美味しい"
店B: "美味しい ラーメン スープ"  
店C: "接客 丁寧 清潔"

があった場合
TF(単語の出現頻度)は
店Aでは「美味しい」が3回 → TFが高い
となり、
IDF(レア度)は
「美味しい」→ 店A,Bで使用(よくある単語)→ IDF低い
「パスタ」→ 店Aだけ(特徴的な単語)→ IDF高い
となります。

この二つを組み合わせることで、そのお店の「特徴」を数字のスコアとして表せます。
(TF-IDF = TF × IDF)
例えば店Aにはパスタという文字があるがこれは他の店のデータでは出てこないのでレア度が高い=特徴となる。また美味しいという言葉は他の店でも出てくるワードだが頻度が高いのでまあまあの特徴となる...この二つの情報を組み合わせて店Aの特徴としています。

ただこれだととんでもない数の特徴量ができてしまうのでSVDで情報量を極力落とすことなく特徴をまとめています。(次元削減)

コード外の技術

Claudeでの一括コーディング方法と消費トークン量の削減方法(と危険性)

本来Kaggleではnotebook形式で処理毎にコードは分割して書きますが、コードをClaudeにコピペして...処理を書き直し...というのをやっていると前後の処理も書き直さないといけないのでめんどくさい...となってしまい結果、notebookの単一のセルにコードを全部入れて処理を回すというようなことをしていました。また各セクションに

"""
6: 地理的クラスタリングによるバリデーション設計
"""

というような数字を付与することで、例えば指定セクションのみ直させることができ結果的にトークン消費量を減らせたのではないかと思いました。
ただしこれが裏目に出て、まさかの前日までtestデータがtrainデータにリークしていることに気がつきませんでした...(testとtrainの緯度経度情報で地理的バリデーションを行っていました...)
リーダーボードで追っていた数字はずっとtestデータに寄った(汎化されていない)スコアだったので本当に無駄なことをしてしまったな...と思います。自分の頭でちゃんと考えて設計をしないとこうなるので次に活かしたいです。

MCP サーバー

ちょうどコンペ中にMCPの発表があったので今回せっかくなのでMCPサーバーを自作して使ってみました。
とはいっても内容はシンプルでduckdbで指定したパスにあるcsvを読んでもらったり統計量を出してもらうというものです。ただこのmcpサーバーを使用してデータを読んだ状態で質問するとデータへの解像度が上がるみたいで結構良い返答を返すようになってました。
ただ今回のようにデータが多すぎるとデータ全てを読んでくれるわけではなく先頭100行くらいしか読ませることができませんでした。今だと他にも方法があるのかもしれません。

duckdbでreadcsvするmcpサーバーのコード抜粋
@mcp.tool()
async def read_csv_stats(csv_path: str, delimiter: str = ',', has_header: bool = True) -> str:
    """CSVファイルを読み込み、基本情報と統計量のみを返します。
    
    Args:
        csv_path: CSVファイルのパス
        delimiter: CSVの区切り文字(デフォルト: ',')
        has_header: ヘッダー行の有無(デフォルト: true)
    
    Returns:
        CSVファイルの基本情報(カラム、データ型、行数、統計量)
    """
    # DuckDB接続を作成
    conn = duckdb.connect(database=':memory:')
    
    # CSVをDuckDBに読み込み
    if has_header:
        query = f"CREATE TABLE csv_data AS SELECT * FROM read_csv_auto('{csv_path}', delim='{delimiter}');"
    else:
        query = f"CREATE TABLE csv_data AS SELECT * FROM read_csv_auto('{csv_path}', delim='{delimiter}', header=false);"
    
    conn.execute(query)
    
    # カラム情報を取得
    columns = get_column_info(conn)
    
    # 行数を取得
    row_count = conn.execute("SELECT COUNT(*) FROM csv_data").fetchone()[0]
    
    # 統計量を取得
    stats = get_stats(conn)
    
    # 結果をまとめる
    result = {
        "file_info": {
            "row_count": row_count,
            "column_count": len(columns)
        },
        "columns": columns,
        "statistics": stats
    }
    
    return json.dumps(result, indent=2)

@mcp.tool()
async def read_csv_sample(csv_path: str, sample_size: int = 5, delimiter: str = ',', has_header: bool = True) -> str:
    """CSVファイルの基本情報、統計量、およびサンプルデータを返します。
    
    Args:
        csv_path: CSVファイルのパス
        sample_size: 取得するサンプル行数(デフォルト: 5)
        delimiter: CSVの区切り文字(デフォルト: ',')
        has_header: ヘッダー行の有無(デフォルト: true)
    
    Returns:
        CSVファイルの基本情報とサンプルデータ
    """
    # DuckDB接続を作成
    conn = duckdb.connect(database=':memory:')
    
    # CSVをDuckDBに読み込み
    if has_header:
        query = f"CREATE TABLE csv_data AS SELECT * FROM read_csv_auto('{csv_path}', delim='{delimiter}');"
    else:
        query = f"CREATE TABLE csv_data AS SELECT * FROM read_csv_auto('{csv_path}', delim='{delimiter}', header=false);"
    
    conn.execute(query)
    
    # カラム情報を取得
    columns = get_column_info(conn)
    
    # 行数を取得
    row_count = conn.execute("SELECT COUNT(*) FROM csv_data").fetchone()[0]
    
    # 統計量を取得
    stats = get_stats(conn)
    
    # サンプルデータを取得(最初の数行)
    sample_data = conn.execute(f"SELECT * FROM csv_data LIMIT {sample_size}").fetchall()
    column_names = [desc[0] for desc in conn.description]
    
    formatted_sample = []
    for row in sample_data:
        row_dict = {}
        for i, value in enumerate(row):
            row_dict[column_names[i]] = str(value) if value is not None else None
        formatted_sample.append(row_dict)
    
    # 結果をまとめる
    result = {
        "file_info": {
            "row_count": row_count,
            "column_count": len(columns)
        },
        "columns": columns,
        "statistics": stats,
        "sample_data": formatted_sample
    }
    
    return json.dumps(result, indent=2)

@mcp.tool()
async def query_csv_limited(csv_path: str, sql_query: str, max_rows: int = 100, delimiter: str = ',', has_header: bool = True) -> str:
    """CSVファイルに対してSQLクエリを実行し、結果を制限して返します。
    
    Args:
        csv_path: CSVファイルのパス
        sql_query: 実行するSQLクエリ(テーブル名は「csv_data」)
        max_rows: 返す最大行数(デフォルト: 100)
        delimiter: CSVの区切り文字(デフォルト: ',')
        has_header: ヘッダー行の有無(デフォルト: true)
    
    Returns:
        SQLクエリの実行結果(制限付き)
    """
    # DuckDB接続を作成
    conn = duckdb.connect(database=':memory:')
    
    # CSVをDuckDBに読み込み
    if has_header:
        query = f"CREATE TABLE csv_data AS SELECT * FROM read_csv_auto('{csv_path}', delim='{delimiter}');"
    else:
        query = f"CREATE TABLE csv_data AS SELECT * FROM read_csv_auto('{csv_path}', delim='{delimiter}', header=false);"
    
    conn.execute(query)
    
    # LIMITを追加してクエリを制限
    if "LIMIT" not in sql_query.upper():
        if ";" in sql_query:
            sql_query = sql_query.replace(";", f" LIMIT {max_rows};")
        else:
            sql_query = f"{sql_query} LIMIT {max_rows}"
    
    # SQLクエリを実行
    result = conn.execute(sql_query).fetchall()
    
    # カラム名を取得
    column_names = [desc[0] for desc in conn.description]
    
    # 結果を整形
    formatted_result = []
    for row in result:
        row_dict = {}
        for i, value in enumerate(row):
            row_dict[column_names[i]] = str(value) if value is not None else None
        formatted_result.append(row_dict)
    
    return json.dumps(formatted_result, indent=2)

まとめ

非常に勉強になるコンペでSubmittionも結構するくらい熱中できました。別のコンペでも頑張りたいと思います!

Discussion