📴
PCからシリアルで送信 ラズパイがUSBに書き込み
概要
ラズパイのGPIOを介したシリアル通信でデータを受信し、そのデータをラズパイに挿したUSBメモリに書き込むシステムの備忘録です。
ここではデータの送信側はPCを使ってテストします。
ボーレートは1000000で動作、1500000はエラーを確認。
テスト環境は以下の通り。
・PC
Windows10 64bit
・ラズパイ
Raspberry Pi 3b+
Raspberry Pi OS Lite (64bit)
・シリアル変換モジュール
Sunhayato MM-FT232HC
受信側ラズパイ準備
- Raspberry Pi ImagerでOSをMicroSDに書き込み、ラズパイに挿入
- sudo apt updateとsudo apt upgrade -yで更新
- 必要なパッケージを導入
sudo apt install -y python3 python3-pip python3-serial
- USBメモリの自動マウントを設定
USBメモリを挿してlsblk, blkidでデバイス名とUUIDを確認
/etc/fstabに追加
UUID=XXXX-XXXX /mnt/usb vfat defaults,uid=XXX,gid=XXX,umask=002,nofail,x-systemd.automount 0 0
マウントポイントを作成
sudo mkdir -p /mnt/usb
sudo mount -a
- シリアルの有効化
raspi-configでInterface Option -> Serialを選択
シリアルコンソールを無効
シリアルポートハードウェアを有効に設定
シリアルコンソールを無効にしていないと混線するので注意。
再起動すると/dev/serial0が利用可能になる。 - 下記の受信用スクリプトを任意のディレクトリに置いて、実行権限を付与する。
chmod +x ~/test/test.py
- systemdサービス化(スクリプトの自動起動とrootでの実行)
/etc/systemd/system/serial-recv.serviceを作成
[Unit]
Description=Serial to USB receiver (FINISH->sync+umount)
After=multi-user.target
[Service]
Type=simple
WorkingDirectory=/home/XXX/test
ExecStart=/usr/bin/python3 /home/XXX/test/test.py
Restart=always
RestartSec=1
[Install]
WantedBy=multi-user.target
有効化
sudo systemctl daemon-reload
sudo systemctl enable --now serial-recv.service
受信側ラズパイスクリプト
#!/usr/bin/env python3
import serial, struct, os, sys, time, subprocess
PORT = "/dev/serial0"
BAUD = 1000000
MOUNT ="/mnt/usb"
def is_mounted():
return subprocess.run(["/bin/mountpoint", "-q", MOUNT]).returncode == 0
def ensure_mounted():
if not is_mounted():
subprocess.run(["/usr/bin/sync"])
subprocess.run(["/bin/mount", MOUNT], check=True)
def safe_unmount():
subprocess.run(["/usr/bin/sync"])
r = subprocess.run(["/bin/umount", MOUNT])
return r.returncode == 0
def recv_line(ser, timeout=10):
ser.timeout = 0.5
buf = bytearray()
t0 = time.time()
while time.time() - t0 < timeout:
b = ser.read(1)
if not b:
continue
if b == b'\n':
return buf.decode(errors='ignore').strip()
if b != b'\r':
buf += b
return None
def recv_u32(ser, timeout=10):
need, buf, t0 = 4, bytearray(), time.time()
ser.timeout = 1
while len(buf) < need and time.time() - t0 < timeout:
part = ser.read(need - len(buf))
if part:
buf += part
if len(buf) != 4:
raise TimeoutError("read u32 timeout")
return struct.unpack("<I", bytes(buf))[0]
def main():
ser = serial.Serial(PORT, BAUD, timeout=2)
ser.reset_input_buffer()
ser.reset_output_buffer()
last_rdy = 0.0
while True:
if time.time() - last_rdy > 0.5:
try: ser.write(b"RDY\n")
except: pass
last_rdy = time.time()
cmd = recv_line(ser, timeout=1)
if not cmd:
continue
if cmd == "PUT":
try:
ensure_mounted()
except Exception as e:
ser.write(b"ERR MOUNT\n")
continue
fname = recv_line(ser, timeout=5) or ""
size_s = recv_line(ser, timeout=5) or "0"
try:
size = int(size_s)
except:
ser.write(b"ERR SIZE\n"); continue
path = os.path.join(MOUNT, fname)
os.makedirs(os.path.dirname(path) or MOUNT, exist_ok=True)
try:
f = open(path, "wb")
except:
ser.write(b"ERR OPEN\n"); continue
ser.write(b"READY\n")
wrote = 0
ok = True
try:
while wrote < size:
n = recv_u32(ser, timeout=1.5)
if n == 0:
break
got, chunk = 0, bytearray()
t0 = time.time()
ser.timeout = 0.05
while got < n and time.time() - t0 <2.0:
part = ser.read(n - got)
if part:
chunk += part; got += len(part)
if got != n:
ok = False; break
f.write(chunk)
wrote += n
ser.write(b"ACK\n")
finally:
try:
f.flush(); os.fsync(f.fileno()); f.close(); os.sync()
except: pass
ser.write(b"OK\n" if ok else b"ERR DATA\n")
elif cmd == "FINISH":
ok = safe_unmount()
ser.write(b"BYE\n" if ok else b"ERR UNMOUNT/n")
elif cmd == "STATUS":
ser.writte(b"MOUNTED/n" if is_mounted() else b"UNMOUNTED/n")
else:
continue
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"ERR: {e}", file=sys.stderr)
sys.exit(1)
送信側PCスクリプト
import os
import sys
import struct
import threading
import time
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
# --- DnD (optional) ---
USE_DND = True
try:
from tkinterdnd2 import DND_FILES, TkinterDnD
except Exception:
USE_DND = False
import serial
from serial.tools import list_ports
APP_TITLE = "Serial File Sender (PC → Raspberry Pi → USB)"
CHUNK_SIZE = 1024 # 高速時は小さめが安定しやすい
def list_serial_ports():
ports = []
for p in list_ports.comports():
label = f"{p.device} - {p.description}"
ports.append((label, p.device))
return ports
class SenderThread(threading.Thread):
def __init__(self, port, baud, filepath, outname, log_cb, progress_cb, done_cb, stop_flag):
super().__init__(daemon=True)
self.port = port
self.baud = baud
self.filepath = filepath
self.outname = outname
self.log = log_cb
self.progress = progress_cb
self.done = done_cb
self.stop_flag = stop_flag
def run(self):
try:
size = os.path.getsize(self.filepath)
self.log(f"Open port: {self.port} @ {self.baud} baud")
with serial.Serial(self.port, self.baud, timeout=0.1, write_timeout=2) as ser:
# 明示設定(8N1 / フロー制御なし)
ser.bytesize = serial.EIGHTBITS
ser.parity = serial.PARITY_NONE
ser.stopbits = serial.STOPBITS_ONE
ser.xonxoff = False
ser.rtscts = False
ser.dsrdtr = False
# Windowsならバッファを拡大
if hasattr(ser, "set_buffer_size"):
try:
ser.set_buffer_size(rx_size=262144, tx_size=262144)
except Exception:
pass
ser.reset_input_buffer(); ser.reset_output_buffer()
# --- RDY待ち(最大10秒、非ASCIIは捨てる) ---
self.log("Waiting for RDY from Pi (10s)...")
start = time.time()
rdy_ok = False
while time.time() - start < 10.0:
s = ser.readline().decode(errors='ignore').strip()
if not s:
continue
if s == "RDY":
rdy_ok = True
break
if not rdy_ok:
raise RuntimeError("Pi not RDY (no 'RDY' within 10s)")
# --- PUT ヘッダ送信 ---
fname = self.outname or os.path.basename(self.filepath)
ser.write(b"PUT\n")
ser.write((fname + "\n").encode())
ser.write((str(size) + "\n").encode())
# READY待ち(空行スキップしつつ2.5秒)
ready = ""
deadline = time.time() + 2.5
while time.time() < deadline:
line = ser.readline().decode(errors='ignore').strip()
if line:
ready = line
break
if ready != "READY":
raise RuntimeError(f"Pi not READY (got: {ready!r})")
# --- ボディ送信 ---
sent = 0
last_p = -1
with open(self.filepath, "rb") as f:
while True:
if self.stop_flag.is_set():
self.log("Canceled by user.")
return
chunk = f.read(CHUNK_SIZE)
if not chunk:
ser.write(struct.pack("<I", 0)) # 終了チャンク
break
ser.write(struct.pack("<I", len(chunk)))
ser.write(chunk)
# ACK待ち(空行無視・2.5秒まで粘る)
ack = ""
deadline = time.time() + 2.5
while time.time() < deadline:
s = ser.readline().decode(errors='ignore').strip()
if s:
ack = s
break
if ack != "ACK":
raise RuntimeError(f"Bad ACK: {ack!r}")
sent += len(chunk)
p = int(sent * 100 / size) if size > 0 else 100
if p != last_p:
self.progress(p)
last_p = p
# --- OK待ち ---
ok = ""
deadline = time.time() + 2.5
while time.time() < deadline:
s = ser.readline().decode(errors='ignore').strip()
if s:
ok = s
break
if ok != "OK":
raise RuntimeError(f"Did not receive OK (got: {ok!r})")
self.progress(100)
self.log("Transfer complete.")
self.done(success=True)
except Exception as e:
self.log(f"ERROR: {e}")
self.done(success=False)
class AppBase:
def __init__(self, root):
self.root = root
self.root.title(APP_TITLE)
self.stop_flag = threading.Event()
self.sender_thread = None
main = ttk.Frame(root, padding=12)
main.grid(sticky="nsew")
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
# Row 1: Port + Refresh + Baud
row = ttk.Frame(main)
row.grid(sticky="ew", pady=(0,6))
row.columnconfigure(1, weight=1)
ttk.Label(row, text="ポート:").grid(row=0, column=0, padx=(0,6))
self.port_var = tk.StringVar()
self.port_cb = ttk.Combobox(row, textvariable=self.port_var, state="readonly", width=40)
self.port_cb.grid(row=0, column=1, sticky="ew")
ttk.Button(row, text="再読み込み", command=self.reload_ports).grid(row=0, column=2, padx=(6,0))
ttk.Label(row, text="ボーレート:").grid(row=0, column=3, padx=(16,6))
self.baud_var = tk.StringVar(value="1000000")
self.baud_entry = ttk.Entry(row, textvariable=self.baud_var, width=10)
self.baud_entry.grid(row=0, column=4)
# Row 2: File select (DnD area + Browse)
row2 = ttk.Frame(main)
row2.grid(sticky="ew", pady=(0,6))
row2.columnconfigure(0, weight=1)
self.file_var = tk.StringVar()
self.dnd_label = ttk.Label(row2, text="ここにファイルをドラッグ&ドロップ(または[参照…])", relief="groove")
self.dnd_label.grid(row=0, column=0, sticky="ew")
ttk.Button(row2, text="参照…", command=self.browse_file).grid(row=0, column=1, padx=(6,0))
# Output name (optional)
row3 = ttk.Frame(main)
row3.grid(sticky="ew", pady=(0,6))
row3.columnconfigure(1, weight=1)
ttk.Label(row3, text="保存名(任意):").grid(row=0, column=0, padx=(0,6))
self.outname_var = tk.StringVar()
ttk.Entry(row3, textvariable=self.outname_var).grid(row=0, column=1, sticky="ew")
# Progress bar
self.pbar = ttk.Progressbar(main, mode="determinate", maximum=100)
self.pbar.grid(sticky="ew", pady=(4,6))
# Buttons
row4 = ttk.Frame(main)
row4.grid(sticky="ew")
ttk.Button(row4, text="送信", command=self.start_send).grid(row=0, column=0)
ttk.Button(row4, text="中止", command=self.cancel_send).grid(row=0, column=1, padx=(6,0))
ttk.Button(row4, text="安全に取り外し", command=self.safe_eject).grid(row=0, column=2, padx=(12,0))
# Log
self.log_text = tk.Text(main, height=10, state="disabled")
self.log_text.grid(sticky="nsew", pady=(6,0))
main.rowconfigure(5, weight=1)
self.reload_ports()
self.setup_dnd()
# ---------- UI helpers ----------
def setup_dnd(self):
if USE_DND and hasattr(self.root, "drop_target_register"):
self.dnd_label.drop_target_register(DND_FILES)
self.dnd_label.dnd_bind("<<Drop>>", self.on_drop)
else:
self.dnd_label.bind("<Button-1>", lambda e: self.browse_file())
def on_drop(self, event):
paths = self.root.splitlist(event.data)
if not paths:
return
self.file_var.set(paths[0])
self.dnd_label.config(text=paths[0])
def browse_file(self):
path = filedialog.askopenfilename()
if path:
self.file_var.set(path)
self.dnd_label.config(text=path)
def reload_ports(self):
ports = list_serial_ports()
self.port_cb["values"] = [label for label, dev in ports]
if ports:
self.port_cb.current(0)
def log(self, msg):
self.log_text.configure(state="normal")
self.log_text.insert("end", msg + "\n")
self.log_text.see("end")
self.log_text.configure(state="disabled")
self.root.update_idletasks()
def set_progress(self, p):
self.pbar["value"] = p
self.root.update_idletasks()
# ---------- Actions ----------
def start_send(self):
if self.sender_thread and self.sender_thread.is_alive():
messagebox.showwarning("送信中", "すでに送信中です。中止するには『中止』を押してください。")
return
port_label = self.port_var.get().strip()
if not port_label:
messagebox.showerror("エラー", "ポートを選択してください。")
return
dev = None
for label, device in list_serial_ports():
if label == port_label:
dev = device
break
if dev is None:
messagebox.showerror("エラー", "有効なポートが見つかりません。")
return
try:
baud = int(self.baud_var.get().strip())
except:
messagebox.showerror("エラー", "ボーレートが不正です。例: 115200")
return
path = self.file_var.get().strip()
if not path or not os.path.isfile(path):
messagebox.showerror("エラー", "送信するファイルを指定してください。")
return
outname = self.outname_var.get().strip() or None
self.set_progress(0)
self.log(f"Start transfer: {path}")
self.stop_flag.clear()
self.sender_thread = SenderThread(
dev, baud, path, outname,
log_cb=self.log,
progress_cb=self.set_progress,
done_cb=self.on_done,
stop_flag=self.stop_flag
)
self.sender_thread.start()
def cancel_send(self):
if self.sender_thread and self.sender_thread.is_alive():
self.stop_flag.set()
self.log("Cancel requested…")
def on_done(self, success):
if success:
messagebox.showinfo("完了", "送信が完了しました。")
else:
messagebox.showerror("エラー", "送信に失敗しました。")
def safe_eject(self):
# 送信中は不可
if self.sender_thread and self.sender_thread.is_alive():
messagebox.showwarning("実行不可", "送信中は取り外せません。完了後に実行してください。")
return
port_label = self.port_var.get().strip()
if not port_label:
messagebox.showerror("エラー", "ポートを選択してください。")
return
dev = None
for label, device in list_serial_ports():
if label == port_label:
dev = device
break
if dev is None:
messagebox.showerror("エラー", "有効なポートが見つかりません。")
return
try:
with serial.Serial(dev, int(self.baud_var.get()), timeout=0.2, write_timeout=1) as ser:
# RDYを少し待つ(2秒)
t0 = time.time()
while time.time() - t0 < 2.0:
s = ser.readline().decode(errors='ignore').strip()
if s == "RDY":
break
# FINISH送信
ser.write(b"FINISH\n")
# BYE待ち(5秒)
bye, deadline = "", time.time() + 5.0
while time.time() < deadline:
s = ser.readline().decode(errors='ignore').strip()
if s:
bye = s
break
if bye == "BYE":
messagebox.showinfo("安全に取り外し", "USBを安全に取り外せます。")
self.log("Safe eject: BYE")
else:
messagebox.showerror("エラー", f"取り外しに失敗しました(応答: {bye!r})")
self.log(f"Safe eject failed: {bye!r}")
except Exception as e:
messagebox.showerror("エラー", f"取り外しに失敗しました: {e}")
self.log(f"ERROR (eject): {e}")
def main():
if USE_DND:
root = TkinterDnD.Tk()
else:
root = tk.Tk()
AppBase(root)
root.geometry("780x520")
root.mainloop()
if __name__ == "__main__":
main()
Discussion