💳️

交通系ICを読み込んでみた

に公開

はじめに

はじめまして。来週に9年ほど推したシリーズのフィナーレライブが迫っており、ソワソワしているダンプです。

今回は、NFCを使って社内の備品管理システムを作った際の知見というか備忘録をまとめてみました。
完成したシステムについてはこちらの記事もご覧ください。
https://www.phonogram.co.jp/works/2024/05/icsuicapasmo.html

システム概要

数十年前に案件の開発用で買ったとされる、物置の肥やしになっていた、
SONY製のNFCリーダー(RC-S370)を再利用し、交通系ICやNFCタグを付けた鍵等を管理するシステムを構築。

システム構成

交通系ICや社用車キー等を保管している場所に専用PCを設置し、スキャナを接続して運用。

  1. スキャナー部分: PythonでNFCスキャンを実行
  2. データベース: Firebase Firestoreにリアルタイムでデータをセット
  3. 管理画面: PCから専用ページをブラウザで開き、Firestoreをウォッチ
  4. 運用フロー: リアルタイムでスキャンされたカードが選択され、誰が借りたのかを記録

前提条件・使用技術

  • 端末側(NFCスキャン部分)

    • OS: Windows PC
    • Python: Python 3.x
    • NFCリーダー: SONY RC-S370(または同等のFeliCa対応リーダー)
    • ライブラリ: nfcpy、firebase-admin、pygame(音声用)
      ※nfcpyをWindowsで使用する場合は、「WinUSB」と「libusb」を手動インストールする必要があります。
  • WEB側(管理画面)

    • フロント: 素のHTML、CSS、JS(画面がシンプルなので)
    • バック: PHPでSlimフレームワークを使用

NFCリーダーですが、nfcpyが対応しているものにする必要があります。
https://github.com/nfcpy/nfcpy/blob/master/src/nfc/clf/device.py#L42-L52

交通系ICカードの読み込み実装

基本的には、NFCのID(IDm)をキーにしてカードを識別しています。
交通系ICについては履歴・残高という要素もあるので、それらも取得できる仕組みにしています。

地域コードを考慮した実装の必要性

検索して出てくる「Python 交通系IC」の記事のコードを拝見すると、「地域コード」が考慮されていない実装が多く見受けられます。
これは関東圏では地域コードを指定しなくても概ね問題なく動作する駅コード体系になっているからだと思います。

しかし、私は広島という田舎に住んでいるため、地域コードも正しく読み取れるように実装する必要がありました。

なぜ地域コードが重要なのか?

地域コードを考慮しないと、以下のような問題が発生しちゃいます:

  • 駅名の誤認識: 同じ線区コード・駅コードでも地域が違うと全く別の駅になってしまう
  • : JR西日本の広島地区と関東圏で同じ駅コードが使われている場合、地域コードがないと「広島駅」が「東京駅」として認識されるなど
  • 履歴の正確性: 交通系ICの利用履歴を正確に表示するためには、正しい駅名が必須

Python実装例

ちょっとコードが長めなので、見たい場合はアコーディオンを開いてください。

開いてコードを見る
import csv
import binascii
import os
import struct
import sys
import traceback
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
import pygame.mixer
import time

sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/nfcpy')

# Use a service account.
cred = credentials.Certificate('firebase-certificate.json')
app = firebase_admin.initialize_app(cred)
db = firestore.client()
card_ref = db.collection("card").document("HOGEHOGEHOGE")

import nfc

# env的な
num_blocks = 20
service_code = 0x090f #「利用履歴」を指定

pygame.mixer.init() #初期化
 
class StationRecord(object):
  db = None

  def __init__(self, row):
    self.area_key = int(row[0], 10)
    self.line_key = int(row[1], 16)
    self.station_key = int(row[2], 16)
    self.company_value = row[3]
    self.line_value = row[4]
    self.station_value = row[5]

  @classmethod
  def get_none(cls):
    # 駅データが見つからないときに使う
    return cls(["0", "0", "0", "None", "None", "None"])
  @classmethod
  def get_db(cls, filename):
    # 駅データのcsvを読み込んでキャッシュする
    if cls.db == None:
      cls.db = []
      for row in csv.reader(open(filename, 'r', encoding='utf-8'), delimiter=',', dialect=csv.excel_tab):
        cls.db.append(cls(row))
    return cls.db
  @classmethod
  def get_station(cls, region_code, line_key, station_key):
    # 線区コードと駅コードに対応するStationRecordを検索する
    for station in cls.get_db("ekicode.csv"):
      if station.area_key == region_code and station.line_key == line_key and station.station_key == station_key:
        return station
    return cls.get_none()
 
class HistoryRecord(object):
  def __init__(self, data):
    # ビッグエンディアンでバイト列を解釈する
    row_be = struct.unpack('>2B2H4BH4B', data)
    # リトルエンディアンでバイト列を解釈する
    row_le = struct.unpack('<2B2H4BH4B', data)
 
    self.db = None
    self.console = self.get_console(row_be[0])
    self.process = self.get_process(row_be[1])
    self.year = self.get_year(row_be[3])
    self.month = self.get_month(row_be[3])
    self.day = self.get_day(row_be[3])
    self.balance = row_le[8]

    in_region_code = (row_be[12] >> 6) & 0x7f
    out_region_code = (row_be[12] >> 4) & 0b11
 
    self.in_station = StationRecord.get_station(in_region_code, row_be[4], row_be[5])
    self.out_station = StationRecord.get_station(out_region_code, row_be[6], row_be[7])
 
  @classmethod
  def get_console(cls, key):
    # よく使われそうなもののみ対応
    return {
      0x03: "精算機",
      0x04: "携帯型端末",
      0x05: "車載端末",
      0x12: "券売機",
      0x16: "改札機",
      0x1c: "乗継精算機",
      0xc8: "自販機",
    }.get(key)
  @classmethod
  def get_process(cls, key):
    # よく使われそうなもののみ対応
    return {
      0x01: "運賃支払",
      0x02: "チャージ",
      0x0f: "バス",
      0x46: "物販",
    }.get(key)
  @classmethod
  def get_year(cls, date):
    return (date >> 9) & 0x7f
  @classmethod
  def get_month(cls, date):
    return (date >> 5) & 0x0f
  @classmethod
  def get_day(cls, date):
    return (date >> 0) & 0x1f
 
def connected(tag):
  pygame.mixer.music.load("scan.mp3") #読み込み
  pygame.mixer.music.play(1) #再生

  print(tag)
  idm = binascii.hexlify(tag._nfcid).decode().upper()
  print("IDm : " + str(idm))
 
  if(tag):
    if not tag.ndef:
      try:
        sc = nfc.tag.tt3.ServiceCode(service_code >> 6, service_code & 0x3f)

        # NOTE: ICカードに記録されている全情報を取得したい場合は、下記コメントアウトを取ればOK
        # ループ処理
        # for i in range(num_blocks):
        #   bc = nfc.tag.tt3.BlockCode(i,service=0)
        #   data = tag.read_without_encryption([sc],[bc])
        #   history = HistoryRecord(bytes(data))
        #   print(f"=== {i:02} ===")
        #   print(f"端末種: {history.console}")
        #   print(f"処理: {history.process}")
        #   print(f"日付: {history.year:02d}-{history.month:02d}-{history.day:02d}")
        #   print(f"入線区: {history.in_station.company_value}-{history.in_station.line_value}")
        #   print(f"入駅順: {history.in_station.station_value}")
        #   print(f"出線区: {history.out_station.company_value}-{history.out_station.line_value}")
        #   print(f"出駅順: {history.out_station.station_value}")
        #   print(f"残高: {history.balance}")
        #   print("BIN: ")
        #   print(" ".join([f'{s:02x}' for s in data]))

        # NOTE: 最新情報だけを取得したいので、ブロック指定
        # 1ブロック目のみ読み込み
        bc = nfc.tag.tt3.BlockCode(0,service=0)
        data = tag.read_without_encryption([sc],[bc])
        history = HistoryRecord(bytes(data))
        print(f"残高: {history.balance}")

        # Firestoreに書き込み
        card_ref.update(
          {
            "card_id": idm,
            "balance": history.balance,
          }
        )
        
      except Exception as e:
        print(f"Error reading service code or block: {e}")
        # Firestoreに書き込み
        card_ref.update(
          {
              "card_id": idm,
              "balance": 0,  # 残高をnullで登録
          }
        )
    else:
      print("error: tag isn't Type3Tag")
      # Firestoreに書き込み
      # 交通系ではないので、残高はnullで登録
      card_ref.update(
        {
          "card_id": idm,
          "balance": 0,
        }
      )
  else:
    print("error: tag isn't tag object")

  time.sleep(0.5)
  return True # Trueを返しておくとタグが存在しなくなるまで待機され、離すとon_releaseが発火する

def released(tag):
  print("released")
 
  # Firestoreに書き込み
  card_ref.update(
    {
      "card_id": "",
      "balance": 0,
    }
  )

with nfc.ContactlessFrontend("usb") as clf:
  while True:
    clf.connect(rdwr={"on-connect": connected, "on-release": released})

コードの中身を解説

StationRecordクラス

駅情報を管理するクラスです。CSVファイルから駅データを読み込み、地域コード・線区コード・駅コードから駅名を検索できます。

class StationRecord(object):
  def __init__(self, row):
    self.area_key = int(row[0], 10)      # 地域コード
    self.line_key = int(row[1], 16)      # 線区コード(16進数)
    self.station_key = int(row[2], 16)   # 駅コード(16進数)
    self.company_value = row[3]          # 会社名
    self.line_value = row[4]             # 路線名
    self.station_value = row[5]          # 駅名

重要なのはget_stationメソッドで、地域コード・線区コード・駅コードの3つの組み合わせで駅を特定します。
駅コードの情報は、有志の方が作成・メンテナンスしてくださっているCSVデータを活用しています。

HistoryRecordクラス

交通系ICカードの履歴データを解析するクラスです。16バイトのバイナリデータから各種情報を抽出します。
構造は下記のサイトを参考にしています。

データ構造は以下のようになっています:

バイト位置 内容 備考
0 機器種別 改札機、券売機など
1 利用種別 運賃支払、チャージなど
2 支払種別? 現金、VIEW等
3 入出場種別 入場、出場等
4-5 出場日付・処理日付 年(7bit)+月(4bit)+日(5bit)
6-7 入場駅コード 路線コード + 駅順コード
8-9 出場駅コード 路線コード + 駅順コード
10-11 残高 リトルエンディアン
12 不明
13-14 取引通番
15 地域コード情報 入場駅(2bit)+出場駅(2bit)+不明(4bit)
def __init__(self, data):
    # ビッグエンディアンでバイト列を解釈する
    row_be = struct.unpack('>2B2H4BH4B', data)
    # リトルエンディアンで残高を解釈する
    row_le = struct.unpack('<2B2H4BH4B', data)
    
    # 各データの抽出
    self.console = self.get_console(row_be[0])    # 機器種別
    self.process = self.get_process(row_be[1])    # 利用種別
    self.balance = row_le[8]                      # 残高(リトルエンディアン)
    
    # 地域コードの抽出
    in_region_code = (row_be[12] >> 6) & 0x7f     # 入場駅の地域コード(上位2bit)
    out_region_code = (row_be[12] >> 4) & 0b11    # 出場駅の地域コード(3-4bit)

地域コードの抽出

  • 地域コードは15バイト目(F列)に格納されている
  • 入場駅の地域コードは上位2ビット
  • 出場駅の地域コードは3-4ビット目
  • この地域コードと駅コードの組み合わせで正確な駅名を特定

メイン処理(connected関数)

カードがスキャンされた時の処理です。

  1. 音声再生: スキャン時に音を鳴らす
  2. IDm取得: カードの固有ID(16進数)を取得
  3. 履歴データ読み込み: サービスコード0x090F(利用履歴)から最新データを取得
  4. Firebase更新: 取得したデータをFirestoreにリアルタイム保存
# サービスコード指定(利用履歴)
service_code = 0x090f
sc = nfc.tag.tt3.ServiceCode(service_code >> 6, service_code & 0x3f)

# 最新の履歴データ(0ブロック目)のみ読み込み
bc = nfc.tag.tt3.BlockCode(0, service=0)
data = tag.read_without_encryption([sc], [bc])
history = HistoryRecord(bytes(data))

カードが離れた時の処理(released関数)

カードが離れた場合は、Firestoreの値を初期化し、カードが選択されていない状態にしています。

def released(tag):
  print("released")
 
  # Firestoreに書き込み
  card_ref.update(
    {
      "card_id": "",
      "balance": 0,
    }
  )

まとめ

今回は交通系ICカードの読み込みで、特に地域コードを考慮した実装について解説しました。

ポイント

  1. 地域コードの重要性: 関東圏以外(たぶん)では地域コードを正しく解析する必要がある
  2. データ構造の理解: FeliCaの技術仕様を理解してバイナリデータを正しくパース
  3. リアルタイム連携: Firebase Firestoreを使った端末非依存な実装

地域コードを考慮すれば交通系ICカードの情報を正確に読み取ることができます。
同じような課題に直面している方の参考になれば幸いです。

参考

http://jennychan.web.fc2.com/format/suica.html
https://github.com/furugori/ekicode
https://zenn.dev/3w36zj6/articles/d3894e83cb7423
https://thinkami.hatenablog.com/entry/2018/01/20/160713

AUN Tech Blog

Discussion