📍

町丁目住所リストに緯度・経度情報を付与(ジオコーディング)

に公開

住所リストに対して、参照データを元に緯度・経度を付与します(ジオコーディング)。
日本の住所体系の特性に合わせて設計された、3段階のマッチングロジックを採用しています。
APIを使わずに手元のPCで高速に処理を実行できます。

なおこの関数は参照データとして、国土交通省が提供する「位置参照情報」との連携を想定して
おりますので、以下サイトから、都道府県別の大字・町丁目レベルのデータをダウンロードして
利用できます。 (国土数値情報ダウンロードサイト): https://nlftp.mlit.go.jp/ksj/

なお、別記事に、ジオコーディングしたい住所リスト自体の整形を行うコードを掲載予定です。

import pandas as pd
import numpy as np
import time

def common_prefix_length(s1, s2):
    """(ヘルパー関数) 2つの文字列の先頭から連続で一致する文字数を返す。"""
    length = 0
    for char1, char2 in zip(s1, s2):
        if char1 == char2:
            length += 1
        else:
            break
    return length

def geocode_with_reference(
    arr_target_address: list or np.ndarray, 
    address_reference: pd.DataFrame, 
    match_threshold: float = 0.6, 
    set_match_high_threshold: float = 0.9
) -> pd.DataFrame:

    """
    ---
    § 処理の3段階ロジック
    ---
    1. **完全一致**: 参照データの「都道府県+市区町村+町丁目」が、処理対象の住所に
       完全に含まれるかを最初にチェックします。最も確実な方法です。

    2. **前方一致**: 「〇〇町」と「〇〇町ビル」のように、住所の先頭部分が連続して
       一致するかを評価します。語順が重要な日本語住所において効果的です。
       (しきい値: `match_threshold`)

    3. **文字集合一致**: 「大手町」と「大丁手町」のような、誤字や表記ゆれを救済
       するためのロジックです。構成文字がどの程度一致するかを評価します。
       誤マッチを防ぐため、非常に高い一致率が求められます。
       (しきい値: `set_match_high_threshold`)
    ---
    § 特徴と高速化の工夫
    ---
    この関数は、大量の住所データを効率的に処理するために、いくつかの重要な工夫を
    盛り込んでいます。

    1. **オフライン処理による速度確保**:
       外部APIへの問い合わせを行わず、手元の計算資源のみで処理が完結します。
       通信の待ち時間が発生しないため、大量データでも安定して高速に動作します。

    2. **探索空間の劇的な削減(最も重要な高速化技術)**:
       1件のターゲット住所に対して、全ての参照データ(全国で約18万件)を一つずつ
    比較する「総当たり」方式になり、膨大な計算が必要になるため、以下のように、
    処理開始時に参照データを市区町村名でグループ分けした
    **『索引(インデックス)』**を辞書として作成します。
    (1) ターゲット住所からまず「東京都千代田区」といった市区町村名を特定します。
       (2) 作成した『索引』を使い、全国18万件の中から「東京都千代田区」に属する
           数十件のデータだけを瞬時に引き出します。
       (3) その数十件の中だけで、詳細な文字列比較を行います。
       
       これにより、比較対象を**数千分の一以下に劇的に絞り込む**ことで、圧倒的な
       処理速度を実現しています。

    3. **データ形式の最適化 (NumPy配列の活用)**:
       処理開始時に、Pandas DataFrameの各列をNumPy配列に一括で変換しています。
       NumPy配列は、Pythonのリストに比べてメモリ効率が高く、ループ処理の中で
       個々のデータにアクセスする速度が非常に高速です。

    4. **階層的なロジックによる計算量の削減**:
       処理ロジックを「確実で高速なものから順に」実行する階層構造にしています。
       多くの住所が最初の『完全一致』ステージで処理を完了するため、計算コストが
       高い部分一致のロジックを実行する回数そのものを最小限に抑えています。

    ---
    § 参照データについて
    ---
    この関数は、国土交通省が提供する「位置参照情報」との連携を想定しています。
    以下のサイトから、都道府県別の大字・町丁目レベルのデータをダウンロードして利用
    できます。ダウンロードしたCSVファイルをPandasで読み込み、`address_reference`
    引数に渡してください。
    
    - **提供サイト**: 国土数値情報ダウンロードサイト
    - **URL**: https://nlftp.mlit.go.jp/ksj/
    - **データ形式**: CSVファイル
    - **必要な列**: '都道府県名', '市区町村名', '大字町丁目名', '緯度', '経度'

    Args:
        arr_target_address (list or np.ndarray): 
            緯度・経度を付与したい住所のリストまたはNumPy配列。
            例: ['東京都千代田区丸の内1-1', '大阪府大阪市北区梅田2-2']

        address_reference (pd.DataFrame): 
            参照用の住所マスターデータ。上記の「位置参照情報」から作成します。
            '都道府県名', '市区町村名', '大字町丁目名', '緯度', '経度' の列が必須です。

        match_threshold (float, optional): 
            「前方一致」と判断するための文字一致率のしきい値。0.0から1.0の間の
            値を指定します。デフォルトは 0.6 (60%)。

        set_match_high_threshold (float, optional): 
            「文字集合一致」と判断するための、非常に高い一致率のしきい値。
            誤字救済のためのロジックなので、高めの設定を推奨します。
            デフォルトは 0.9 (90%)。

    Returns:
        pd.DataFrame: ジオコーディングの結果を格納したデータフレーム。以下の列を含みます。
            - **元住所ID**: 元のリストでのインデックス番号。
            - **元住所**: 処理対象となった住所。
            - **変換後住所**: マッチした参照データの住所。
            - **緯度**: 付与された緯度。
            - **経度**: 付与された経度。
            - **マッチ種別**: '完全一致', '部分一致(前方)', '部分一致(文字集合)', '不一致'など。
            - **一致率**: マッチした際の一致率スコア。
            - **最近傍候補**: マッチしなかった場合に、最も近かった参照データの候補。
    """

    print('ジオコーディング処理を開始します...(3段階ロジック版)')
    print(f'処理対象の住所件数 = {len(arr_target_address)}')
    start_time = time.time()

    # STEP 1: マッチングのための事前準備
    ref_city_list = (address_reference['都道府県名'] + address_reference['市区町村名']).to_numpy()
    ref_citytown_list = (address_reference['都道府県名'] + address_reference['市区町村名'] + address_reference['大字町丁目名']).to_numpy()
    ref_town_list = address_reference['大字町丁目名'].to_numpy()
    ref_lat_list = address_reference['緯度'].to_numpy()
    ref_lon_list = address_reference['経度'].to_numpy()
    
    unique_cities = np.unique(ref_city_list)
    city_to_ref_indices = {city: np.where(ref_city_list == city)[0] for city in unique_cities}
    
    results = []

    # STEP 2: メイン処理 (住所を一つずつループ)
    for target_index, target_address in enumerate(arr_target_address):
        
        # --- 2-1. 市区町村名の特定 ---
        matched_city_name = None; candidate_ref_indices = []
        for city_name in unique_cities:
            if city_name in target_address:
                matched_city_name = city_name; candidate_ref_indices = city_to_ref_indices[city_name]; break
        if matched_city_name is None:
            results.append({'元住所ID': target_index, '元住所': target_address, '変換後住所': None, '緯度': 0.0, '経度': 0.0, 'マッチ種別': '市区町村不明', '一致率': 0.0, '最近傍候補ID': -1, '最近傍候補': '該当なし'})
            continue

        # --- STAGE 1: 完全一致のマッチング ---
        match_found_flag = False
        for ref_index in candidate_ref_indices:
            if ref_citytown_list[ref_index] in target_address:
                results.append({'元住所ID': target_index, '元住所': target_address, '変換後住所': ref_citytown_list[ref_index], '緯度': ref_lat_list[ref_index], '経度': ref_lon_list[ref_index], 'マッチ種別': '完全一致', '一致率': 1.0, '最近傍候補ID': ref_index, '最近傍候補': ref_citytown_list[ref_index]})
                match_found_flag = True; break
        if match_found_flag: continue

        # --- STAGE 2 & 3 のための準備: 両方のスコアを計算 ---
        best_fwd_score = -1; best_fwd_ref_index = -1
        best_set_score = -1; best_set_ref_index = -1
        target_town_part = target_address.replace(matched_city_name, '').replace('大字', '')
        
        for ref_index in candidate_ref_indices:
            ref_town_part_fwd = ref_town_list[ref_index].replace('大字', '')
            ref_town_part_set = ref_town_list[ref_index].replace('大字', '').replace('丁目','')
            
            fwd_score = common_prefix_length(target_town_part, ref_town_part_fwd)
            if fwd_score > best_fwd_score: best_fwd_score = fwd_score; best_fwd_ref_index = ref_index
                
            set_score = len(set(target_town_part.replace('丁目','')) & set(ref_town_part_set))
            if set_score > best_set_score: best_set_score = set_score; best_set_ref_index = ref_index
        
        # --- STAGE 2: 前方一致による判定 ---
        if best_fwd_ref_index != -1:
            chome_check_ok = ('丁目' in target_address) == ('丁目' in ref_town_list[best_fwd_ref_index])
            if chome_check_ok:
                ref_town_part_for_rate = ref_town_list[best_fwd_ref_index].replace('大字','')
                fwd_rate = best_fwd_score / len(ref_town_part_for_rate) if len(ref_town_part_for_rate) > 0 else 0.0
                if fwd_rate >= match_threshold:
                    results.append({'元住所ID': target_index, '元住所': target_address, '変換後住所': ref_citytown_list[best_fwd_ref_index], '緯度': ref_lat_list[best_fwd_ref_index], '経度': ref_lon_list[best_fwd_ref_index], 'マッチ種別': '部分一致(前方)', '一致率': fwd_rate, '最近傍候補ID': best_fwd_ref_index, '最近傍候補': ref_citytown_list[best_fwd_ref_index]})
                    continue

        # --- STAGE 3: 文字集合一致による判定 ---
        if best_set_ref_index != -1:
            chome_check_ok = ('丁目' in target_address) == ('丁目' in ref_town_list[best_set_ref_index])
            set_rate = 0.0
            if chome_check_ok:
                ref_town_part_for_rate = ref_town_list[best_set_ref_index].replace('大字','').replace('丁目','')
                set_rate = best_set_score / len(ref_town_part_for_rate) if len(ref_town_part_for_rate) > 0 else 0.0
            
            if set_rate >= set_match_high_threshold:
                results.append({'元住所ID': target_index, '元住所': target_address, '変換後住所': ref_citytown_list[best_set_ref_index], '緯度': ref_lat_list[best_set_ref_index], '経度': ref_lon_list[best_set_ref_index], 'マッチ種別': '部分一致(文字集合)', '一致率': set_rate, '最近傍候補ID': best_set_ref_index, '最近傍候補': ref_citytown_list[best_set_ref_index]})
                continue

        # --- 全てのステージで不一致だった場合 ---
        final_fail_rate = 0.0
        if best_set_ref_index != -1:
            ref_town_part_for_rate = ref_town_list[best_set_ref_index].replace('大字','').replace('丁目','')
            final_fail_rate = best_set_score / len(ref_town_part_for_rate) if len(ref_town_part_for_rate) > 0 else 0.0

        results.append({'元住所ID': target_index, '元住所': target_address, '変換後住所': None, '緯度': 0.0, '経度': 0.0, 'マッチ種別': '不一致', '一致率': final_fail_rate, '最近傍候補ID': best_set_ref_index, '最近傍候補': ref_citytown_list[best_set_ref_index] if best_set_ref_index!=-1 else '該当なし'})

    print(f"処理が完了しました。 (処理時間: {time.time() - start_time:.2f}秒)")
    return pd.DataFrame(results)
'''

Discussion