📴

PCからシリアルで送信 ラズパイがUSBに書き込み

に公開

概要

ラズパイのGPIOを介したシリアル通信でデータを受信し、そのデータをラズパイに挿したUSBメモリに書き込むシステムの備忘録です。
ここではデータの送信側はPCを使ってテストします。
ボーレートは1000000で動作、1500000はエラーを確認。
テスト環境は以下の通り。
・PC
 Windows10 64bit
・ラズパイ
 Raspberry Pi 3b+
 Raspberry Pi OS Lite (64bit)
・シリアル変換モジュール
 Sunhayato MM-FT232HC

受信側ラズパイ準備

  1. Raspberry Pi ImagerでOSをMicroSDに書き込み、ラズパイに挿入
  2. sudo apt updateとsudo apt upgrade -yで更新
  3. 必要なパッケージを導入
sudo apt install -y python3 python3-pip python3-serial
  1. USBメモリの自動マウントを設定
    USBメモリを挿してlsblk, blkidでデバイス名とUUIDを確認
    /etc/fstabに追加
UUID=XXXX-XXXX  /mnt/usb  vfat  defaults,uid=XXX,gid=XXX,umask=002,nofail,x-systemd.automount  0  0

マウントポイントを作成

sudo mkdir -p /mnt/usb
sudo mount -a
  1. シリアルの有効化
    raspi-configでInterface Option -> Serialを選択
    シリアルコンソールを無効
    シリアルポートハードウェアを有効に設定
    シリアルコンソールを無効にしていないと混線するので注意。
    再起動すると/dev/serial0が利用可能になる。
  2. 下記の受信用スクリプトを任意のディレクトリに置いて、実行権限を付与する。
chmod +x ~/test/test.py
  1. systemdサービス化(スクリプトの自動起動とrootでの実行)
    /etc/systemd/system/serial-recv.serviceを作成
[Unit]
Description=Serial to USB receiver (FINISH->sync+umount)
After=multi-user.target

[Service]
Type=simple
WorkingDirectory=/home/XXX/test
ExecStart=/usr/bin/python3 /home/XXX/test/test.py
Restart=always
RestartSec=1

[Install]
WantedBy=multi-user.target

有効化

sudo systemctl daemon-reload
sudo systemctl enable --now serial-recv.service

受信側ラズパイスクリプト

#!/usr/bin/env python3
import serial, struct, os, sys, time, subprocess
PORT = "/dev/serial0"
BAUD = 1000000
MOUNT ="/mnt/usb"

def is_mounted():
	return subprocess.run(["/bin/mountpoint", "-q", MOUNT]).returncode == 0

def ensure_mounted():
	if not is_mounted():
		subprocess.run(["/usr/bin/sync"])
		subprocess.run(["/bin/mount", MOUNT], check=True)

def safe_unmount():
	subprocess.run(["/usr/bin/sync"])
	r = subprocess.run(["/bin/umount", MOUNT])
	return r.returncode == 0

def recv_line(ser, timeout=10):
	ser.timeout = 0.5
	buf = bytearray()
	t0 = time.time()
	while time.time() - t0 < timeout:
		b = ser.read(1)
		if not b:
			continue
		if b == b'\n':
			return buf.decode(errors='ignore').strip()
		if b != b'\r':
			buf += b
	return None

def recv_u32(ser, timeout=10):
	need, buf, t0 = 4, bytearray(), time.time()
	ser.timeout = 1
	while len(buf) < need and time.time() - t0 < timeout:
		part = ser.read(need - len(buf))
		if part:
			buf += part
	if len(buf) != 4:
		raise TimeoutError("read u32 timeout")
	return struct.unpack("<I", bytes(buf))[0]

def main():
	ser = serial.Serial(PORT, BAUD, timeout=2)
	ser.reset_input_buffer()
	ser.reset_output_buffer()
	last_rdy = 0.0

	while True:
		if time.time() - last_rdy > 0.5:
			try: ser.write(b"RDY\n")
			except: pass
			last_rdy = time.time()
		cmd = recv_line(ser, timeout=1)
		if not cmd:
			continue
		if cmd == "PUT":
			try:
				ensure_mounted()
			except Exception as e:
				ser.write(b"ERR MOUNT\n")
				continue

			fname = recv_line(ser, timeout=5) or ""
			size_s = recv_line(ser, timeout=5) or "0"
			try:
				size = int(size_s)
			except:
				ser.write(b"ERR SIZE\n"); continue
			path = os.path.join(MOUNT, fname)
			os.makedirs(os.path.dirname(path) or MOUNT, exist_ok=True)

			try:
				f = open(path, "wb")
			except:
				ser.write(b"ERR OPEN\n"); continue
			ser.write(b"READY\n")
			wrote = 0
			ok = True
			try:
				while wrote < size:
					n = recv_u32(ser, timeout=1.5)
					if n == 0:
						break
					got, chunk = 0, bytearray()
					t0 = time.time()
					ser.timeout = 0.05
					while got < n and time.time() - t0 <2.0:
						part = ser.read(n - got)
						if part:
							chunk += part; got += len(part)
					if got != n:
						ok = False; break
					f.write(chunk)
					wrote += n
					ser.write(b"ACK\n")
			finally:
				try:
					f.flush(); os.fsync(f.fileno()); f.close(); os.sync()
				except: pass
			ser.write(b"OK\n" if ok else b"ERR DATA\n")

		elif cmd == "FINISH":
			ok = safe_unmount()
			ser.write(b"BYE\n" if ok else b"ERR UNMOUNT/n")
		elif cmd == "STATUS":
			ser.writte(b"MOUNTED/n" if is_mounted() else b"UNMOUNTED/n")
		else:
			continue

if __name__ == "__main__":
	try:
		main()
	except Exception as e:
		print(f"ERR: {e}", file=sys.stderr)
		sys.exit(1)

送信側PCスクリプト

import os
import sys
import struct
import threading
import time
import tkinter as tk
from tkinter import ttk, filedialog, messagebox

# --- DnD (optional) ---
USE_DND = True
try:
    from tkinterdnd2 import DND_FILES, TkinterDnD
except Exception:
    USE_DND = False

import serial
from serial.tools import list_ports

APP_TITLE = "Serial File Sender (PC → Raspberry Pi → USB)"
CHUNK_SIZE = 1024  # 高速時は小さめが安定しやすい

def list_serial_ports():
    ports = []
    for p in list_ports.comports():
        label = f"{p.device}  -  {p.description}"
        ports.append((label, p.device))
    return ports

class SenderThread(threading.Thread):
    def __init__(self, port, baud, filepath, outname, log_cb, progress_cb, done_cb, stop_flag):
        super().__init__(daemon=True)
        self.port = port
        self.baud = baud
        self.filepath = filepath
        self.outname = outname
        self.log = log_cb
        self.progress = progress_cb
        self.done = done_cb
        self.stop_flag = stop_flag

    def run(self):
        try:
            size = os.path.getsize(self.filepath)
            self.log(f"Open port: {self.port} @ {self.baud} baud")
            with serial.Serial(self.port, self.baud, timeout=0.1, write_timeout=2) as ser:
                # 明示設定(8N1 / フロー制御なし)
                ser.bytesize = serial.EIGHTBITS
                ser.parity   = serial.PARITY_NONE
                ser.stopbits = serial.STOPBITS_ONE
                ser.xonxoff  = False
                ser.rtscts   = False
                ser.dsrdtr   = False
                # Windowsならバッファを拡大
                if hasattr(ser, "set_buffer_size"):
                    try:
                        ser.set_buffer_size(rx_size=262144, tx_size=262144)
                    except Exception:
                        pass

                ser.reset_input_buffer(); ser.reset_output_buffer()

                # --- RDY待ち(最大10秒、非ASCIIは捨てる) ---
                self.log("Waiting for RDY from Pi (10s)...")
                start = time.time()
                rdy_ok = False
                while time.time() - start < 10.0:
                    s = ser.readline().decode(errors='ignore').strip()
                    if not s:
                        continue
                    if s == "RDY":
                        rdy_ok = True
                        break
                if not rdy_ok:
                    raise RuntimeError("Pi not RDY (no 'RDY' within 10s)")

                # --- PUT ヘッダ送信 ---
                fname = self.outname or os.path.basename(self.filepath)
                ser.write(b"PUT\n")
                ser.write((fname + "\n").encode())
                ser.write((str(size) + "\n").encode())

                # READY待ち(空行スキップしつつ2.5秒)
                ready = ""
                deadline = time.time() + 2.5
                while time.time() < deadline:
                    line = ser.readline().decode(errors='ignore').strip()
                    if line:
                        ready = line
                        break
                if ready != "READY":
                    raise RuntimeError(f"Pi not READY (got: {ready!r})")

                # --- ボディ送信 ---
                sent = 0
                last_p = -1
                with open(self.filepath, "rb") as f:
                    while True:
                        if self.stop_flag.is_set():
                            self.log("Canceled by user.")
                            return
                        chunk = f.read(CHUNK_SIZE)
                        if not chunk:
                            ser.write(struct.pack("<I", 0))   # 終了チャンク
                            break
                        ser.write(struct.pack("<I", len(chunk)))
                        ser.write(chunk)
                        # ACK待ち(空行無視・2.5秒まで粘る)
                        ack = ""
                        deadline = time.time() + 2.5
                        while time.time() < deadline:
                            s = ser.readline().decode(errors='ignore').strip()
                            if s:
                                ack = s
                                break
                        if ack != "ACK":
                            raise RuntimeError(f"Bad ACK: {ack!r}")
                        sent += len(chunk)
                        p = int(sent * 100 / size) if size > 0 else 100
                        if p != last_p:
                            self.progress(p)
                            last_p = p

                # --- OK待ち ---
                ok = ""
                deadline = time.time() + 2.5
                while time.time() < deadline:
                    s = ser.readline().decode(errors='ignore').strip()
                    if s:
                        ok = s
                        break
                if ok != "OK":
                    raise RuntimeError(f"Did not receive OK (got: {ok!r})")

                self.progress(100)
                self.log("Transfer complete.")
                self.done(success=True)
        except Exception as e:
            self.log(f"ERROR: {e}")
            self.done(success=False)

class AppBase:
    def __init__(self, root):
        self.root = root
        self.root.title(APP_TITLE)
        self.stop_flag = threading.Event()
        self.sender_thread = None

        main = ttk.Frame(root, padding=12)
        main.grid(sticky="nsew")
        root.columnconfigure(0, weight=1)
        root.rowconfigure(0, weight=1)

        # Row 1: Port + Refresh + Baud
        row = ttk.Frame(main)
        row.grid(sticky="ew", pady=(0,6))
        row.columnconfigure(1, weight=1)

        ttk.Label(row, text="ポート:").grid(row=0, column=0, padx=(0,6))
        self.port_var = tk.StringVar()
        self.port_cb = ttk.Combobox(row, textvariable=self.port_var, state="readonly", width=40)
        self.port_cb.grid(row=0, column=1, sticky="ew")
        ttk.Button(row, text="再読み込み", command=self.reload_ports).grid(row=0, column=2, padx=(6,0))

        ttk.Label(row, text="ボーレート:").grid(row=0, column=3, padx=(16,6))
        self.baud_var = tk.StringVar(value="1000000")
        self.baud_entry = ttk.Entry(row, textvariable=self.baud_var, width=10)
        self.baud_entry.grid(row=0, column=4)

        # Row 2: File select (DnD area + Browse)
        row2 = ttk.Frame(main)
        row2.grid(sticky="ew", pady=(0,6))
        row2.columnconfigure(0, weight=1)

        self.file_var = tk.StringVar()
        self.dnd_label = ttk.Label(row2, text="ここにファイルをドラッグ&ドロップ(または[参照…])", relief="groove")
        self.dnd_label.grid(row=0, column=0, sticky="ew")
        ttk.Button(row2, text="参照…", command=self.browse_file).grid(row=0, column=1, padx=(6,0))

        # Output name (optional)
        row3 = ttk.Frame(main)
        row3.grid(sticky="ew", pady=(0,6))
        row3.columnconfigure(1, weight=1)
        ttk.Label(row3, text="保存名(任意):").grid(row=0, column=0, padx=(0,6))
        self.outname_var = tk.StringVar()
        ttk.Entry(row3, textvariable=self.outname_var).grid(row=0, column=1, sticky="ew")

        # Progress bar
        self.pbar = ttk.Progressbar(main, mode="determinate", maximum=100)
        self.pbar.grid(sticky="ew", pady=(4,6))

        # Buttons
        row4 = ttk.Frame(main)
        row4.grid(sticky="ew")
        ttk.Button(row4, text="送信", command=self.start_send).grid(row=0, column=0)
        ttk.Button(row4, text="中止", command=self.cancel_send).grid(row=0, column=1, padx=(6,0))
        ttk.Button(row4, text="安全に取り外し", command=self.safe_eject).grid(row=0, column=2, padx=(12,0))

        # Log
        self.log_text = tk.Text(main, height=10, state="disabled")
        self.log_text.grid(sticky="nsew", pady=(6,0))
        main.rowconfigure(5, weight=1)

        self.reload_ports()
        self.setup_dnd()

    # ---------- UI helpers ----------
    def setup_dnd(self):
        if USE_DND and hasattr(self.root, "drop_target_register"):
            self.dnd_label.drop_target_register(DND_FILES)
            self.dnd_label.dnd_bind("<<Drop>>", self.on_drop)
        else:
            self.dnd_label.bind("<Button-1>", lambda e: self.browse_file())

    def on_drop(self, event):
        paths = self.root.splitlist(event.data)
        if not paths:
            return
        self.file_var.set(paths[0])
        self.dnd_label.config(text=paths[0])

    def browse_file(self):
        path = filedialog.askopenfilename()
        if path:
            self.file_var.set(path)
            self.dnd_label.config(text=path)

    def reload_ports(self):
        ports = list_serial_ports()
        self.port_cb["values"] = [label for label, dev in ports]
        if ports:
            self.port_cb.current(0)

    def log(self, msg):
        self.log_text.configure(state="normal")
        self.log_text.insert("end", msg + "\n")
        self.log_text.see("end")
        self.log_text.configure(state="disabled")
        self.root.update_idletasks()

    def set_progress(self, p):
        self.pbar["value"] = p
        self.root.update_idletasks()

    # ---------- Actions ----------
    def start_send(self):
        if self.sender_thread and self.sender_thread.is_alive():
            messagebox.showwarning("送信中", "すでに送信中です。中止するには『中止』を押してください。")
            return

        port_label = self.port_var.get().strip()
        if not port_label:
            messagebox.showerror("エラー", "ポートを選択してください。")
            return

        dev = None
        for label, device in list_serial_ports():
            if label == port_label:
                dev = device
                break
        if dev is None:
            messagebox.showerror("エラー", "有効なポートが見つかりません。")
            return

        try:
            baud = int(self.baud_var.get().strip())
        except:
            messagebox.showerror("エラー", "ボーレートが不正です。例: 115200")
            return

        path = self.file_var.get().strip()
        if not path or not os.path.isfile(path):
            messagebox.showerror("エラー", "送信するファイルを指定してください。")
            return

        outname = self.outname_var.get().strip() or None

        self.set_progress(0)
        self.log(f"Start transfer: {path}")
        self.stop_flag.clear()

        self.sender_thread = SenderThread(
            dev, baud, path, outname,
            log_cb=self.log,
            progress_cb=self.set_progress,
            done_cb=self.on_done,
            stop_flag=self.stop_flag
        )
        self.sender_thread.start()

    def cancel_send(self):
        if self.sender_thread and self.sender_thread.is_alive():
            self.stop_flag.set()
            self.log("Cancel requested…")

    def on_done(self, success):
        if success:
            messagebox.showinfo("完了", "送信が完了しました。")
        else:
            messagebox.showerror("エラー", "送信に失敗しました。")

    def safe_eject(self):
        # 送信中は不可
        if self.sender_thread and self.sender_thread.is_alive():
            messagebox.showwarning("実行不可", "送信中は取り外せません。完了後に実行してください。")
            return

        port_label = self.port_var.get().strip()
        if not port_label:
            messagebox.showerror("エラー", "ポートを選択してください。")
            return

        dev = None
        for label, device in list_serial_ports():
            if label == port_label:
                dev = device
                break
        if dev is None:
            messagebox.showerror("エラー", "有効なポートが見つかりません。")
            return

        try:
            with serial.Serial(dev, int(self.baud_var.get()), timeout=0.2, write_timeout=1) as ser:
                # RDYを少し待つ(2秒)
                t0 = time.time()
                while time.time() - t0 < 2.0:
                    s = ser.readline().decode(errors='ignore').strip()
                    if s == "RDY":
                        break
                # FINISH送信
                ser.write(b"FINISH\n")
                # BYE待ち(5秒)
                bye, deadline = "", time.time() + 5.0
                while time.time() < deadline:
                    s = ser.readline().decode(errors='ignore').strip()
                    if s:
                        bye = s
                        break
            if bye == "BYE":
                messagebox.showinfo("安全に取り外し", "USBを安全に取り外せます。")
                self.log("Safe eject: BYE")
            else:
                messagebox.showerror("エラー", f"取り外しに失敗しました(応答: {bye!r})")
                self.log(f"Safe eject failed: {bye!r}")
        except Exception as e:
            messagebox.showerror("エラー", f"取り外しに失敗しました: {e}")
            self.log(f"ERROR (eject): {e}")

def main():
    if USE_DND:
        root = TkinterDnD.Tk()
    else:
        root = tk.Tk()
    AppBase(root)
    root.geometry("780x520")
    root.mainloop()

if __name__ == "__main__":
    main()

Discussion