Amazon注文を管理する簡易Webアプリの作り方(Python)
Amazon注文を管理する簡易Webアプリの作り方(Python)
今回は、Amazonの注文を管理する簡易Webアプリの構築方法を紹介します。このアプリは、Pythonで動作する軽量なツールで、注文確認や履歴管理ができます。何度も購入する商品をNFCタグに書いておいて、スマホでかざして利用することを想定していますが、PCやスマホから直接URLを叩いても使えますよ!
アプリの概要
このWebアプリは、NFCタグに書き込んだ商品のASIN(Amazon Standard Identification Number)を使って、注文確認や履歴管理を行うものです。以下のような特徴があります:
- NFCタグで簡単操作:商品ごとのNFCタグをスマホで読み込むと、注文確認ページにジャンプ。
- 注文履歴の管理:過去の注文を参照したり、商品名や到着日を追加可能。
- シンプルな設計:WindowsのPython 3.1x環境で動作し、LAN内で簡単にアクセス可能。
- Amazon規約を考慮:注文の自動化は規約違反のため、手動注文を前提に設計。
注意:このアプリは注文を自動化するものではありません。Amazonの規約上、注文処理は手動で行う必要があります。
動作の流れ
- NFCタグの準備
商品ごとのNFCタグに以下の形式でURLを書き込みます(PCのIPアドレスは例として192.168.0.2を使用)。
http://192.168.0.2/[8桁のASIN]
例: http://192.168.0.2/B01N5IB20Q
-
NFCタグをスキャン
スマホでNFCタグを読み込むと、注文確認ページ(confirm.html
)が表示されます。- 初回注文の場合は「初回注文」と表示。
- 2回目以降は前回の注文日時を表示。
- 選択肢として「はい」「商品ページを確認」「いいえ」の3つのボタンが表示されます。
-
ボタンの動作
- はい:注文履歴に追加し、Amazonの商品ページに遷移。
- 商品ページを確認:履歴には追加せず、Amazonの商品ページに遷移。
-
いいえ:注文履歴ページ(
history.html
)に遷移。
- 注文履歴ページ
- 過去の注文を一覧で確認可能。
- 商品名、注文日時、単価、数量、合計額、到着日を編集・保存可能。
- 削除フラグを立てると、履歴はグレーアウト(論理削除)。
- 履歴はCSVでエクスポート可能(インポートは未対応)。
必要な環境
- OS:Windows 11
- Python:3.1x以上
- ネットワーク:LAN内で動作(例: PCのIPアドレスが
192.168.0.2
) - NFCタグ:商品ごとのURLを書き込めるもの(オプション)
構築手順
以下の手順でアプリをセットアップします。コマンドはWindowsのcmd
で実行してください。
1. ディレクトリ作成と仮想環境の設定
mkdir amzOrder
cd amzOrder
python3 -m venv amzOrder
.\amzOrder\Scripts\activate
python.exe -m pip install --upgrade pip
2. 必要なライブラリのインストール
pip install fastapi uvicorn jinja2 python-dotenv pydantic python-multipart
3. ファイルとディレクトリの作成
mkdir templates
touch main.py amzorder.env templates/history.html templates/confirm.html
4. ソースコードの配置
以下に、4つのファイル(main.py
, amzorder.env
, history.html
, confirm.html
)の内容を記載します。コピー&ペーストして保存してください。
main.py
import sqlite3
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse, PlainTextResponse
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
import os
# ログ設定
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("amzorder.log"),
logging.StreamHandler()
],
force=True
)
logger = logging.getLogger(__name__)
# 環境変数
load_dotenv("amzorder.env")
DB_FILE = os.getenv("DB_FILE", "purchases.db")
PORT = int(os.getenv("PORT", 8000))
# FastAPIアプリ
app = FastAPI(title="amzOrder")
templates = Jinja2Templates(directory="templates")
# データベース初期化
def init_db():
with sqlite3.connect(DB_FILE) as conn:
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS purchases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asin TEXT NOT NULL,
url TEXT NOT NULL,
order_date TEXT NOT NULL,
delivery_date TEXT,
quantity INTEGER,
unit_price INTEGER,
product_name TEXT,
status TEXT NOT NULL,
is_deleted INTEGER DEFAULT 0
)
""")
conn.commit()
logger.info("Database initialized or already exists")
init_db()
# ルートURLのリダイレクト
@app.get("/", response_class=RedirectResponse)
async def redirect_to_history():
logger.info("GET /, redirecting to /history")
return RedirectResponse(url="/history", status_code=303)
# 履歴表示
@app.get("/history", response_class=HTMLResponse)
async def history(request: Request, filter: Optional[str] = None, sort: Optional[str] = "order_date_desc"):
logger.info(f"GET /history, filter: {filter}, sort: {sort}")
with sqlite3.connect(DB_FILE) as conn:
c = conn.cursor()
query = """
SELECT id, asin, url, order_date, delivery_date, quantity, unit_price, product_name, is_deleted
FROM purchases
"""
params = []
if filter:
query += " WHERE product_name LIKE ?"
params.append(f"%{filter}%")
if sort == "order_date_asc":
query += " ORDER BY order_date ASC, product_name ASC"
elif sort == "product_name_asc":
query += " ORDER BY product_name ASC, order_date DESC"
elif sort == "product_name_desc":
query += " ORDER BY product_name DESC, order_date DESC"
else: # order_date_desc (default)
query += " ORDER BY order_date DESC, product_name ASC"
c.execute(query, params)
records = [
{
"id": row[0],
"asin": row[1],
"url": row[2],
"order_date": row[3],
"delivery_date": row[4],
"quantity": row[5],
"unit_price": row[6],
"product_name": row[7] or "-",
"is_deleted": row[8],
"total_amount": (row[5] * row[6]) if row[5] is not None and row[6] is not None else "-"
}
for row in c.fetchall()
]
logger.info(f"Retrieved {len(records)} records")
error_message = request.query_params.get("error", None)
return templates.TemplateResponse(
"history.html",
{
"request": request,
"records": records,
"filter": filter or "",
"sort": sort,
"error_message": error_message
}
)
# CSVエクスポート (GET)
@app.get("/history/export", response_class=PlainTextResponse)
async def export_csv():
logger.info("GET /history/export")
with sqlite3.connect(DB_FILE) as conn:
c = conn.cursor()
c.execute("""
SELECT id, url, order_date, delivery_date, quantity, unit_price,
CASE WHEN quantity IS NOT NULL AND unit_price IS NOT NULL THEN quantity * unit_price ELSE '-' END as total_amount,
is_deleted
FROM purchases
ORDER BY order_date ASC
""")
records = c.fetchall()
csv_content = "Row,URL,Order Date,Delivery Date,Quantity,Unit Price,Total Amount,Deleted\n"
for idx, row in enumerate(records, 1):
delivery_date = row[3] or "-"
quantity = str(row[4]) if row[4] is not None else "-"
unit_price = str(row[5]) if row[5] is not None else "-"
total_amount = str(row[6]) if row[6] != "-" else "-"
is_deleted = "Deleted" if row[7] == 1 else ""
csv_content += f"{idx},{row[1]},{row[2]},{delivery_date},{quantity},{unit_price},{total_amount},{is_deleted}\n"
csv_content = "\ufeff" + csv_content # UTF-8 BOM
logger.info(f"Exported {len(records)} records")
return PlainTextResponse(content=csv_content, headers={"Content-Disposition": "attachment; filename=order_history.csv"})
# CSVエクスポート (HEAD)
@app.head("/history/export")
async def export_csv_head():
logger.info("HEAD /history/export")
return PlainTextResponse(content="", headers={"Content-Disposition": "attachment; filename=order_history.csv"})
# 確認画面
@app.get("/{asin}", response_class=HTMLResponse)
async def confirm_order(request: Request, asin: str):
logger.info(f"GET /{asin}")
with sqlite3.connect(DB_FILE) as conn:
c = conn.cursor()
c.execute("""
SELECT order_date, product_name
FROM purchases
WHERE asin = ?
ORDER BY order_date DESC LIMIT 1
""", (asin,))
last_order = c.fetchone()
last_order_date = last_order[0] if last_order else None
product_name = last_order[1] if last_order and last_order[1] else "不明"
logger.info(f"Last order for ASIN {asin}: date={last_order_date}, product_name={product_name}")
is_first_order = last_order_date is None
return templates.TemplateResponse(
"confirm.html",
{
"request": request,
"asin": asin,
"product_name": product_name,
"last_order_date": last_order_date or "初回注文",
"is_first_order": is_first_order,
"amazon_url": f"https://www.amazon.co.jp/dp/{asin}"
}
)
# 注文処理
@app.post("/order/{asin}", response_class=RedirectResponse)
async def process_order(asin: str, action: str = Form(...)):
logger.info(f"POST /order/{asin}, action: {action}")
if action == "yes":
jst = timezone(timedelta(hours=9)) # JST (+9時間)
order_date = datetime.now(jst).strftime("%Y-%m-%d %H:%M:%S")
with sqlite3.connect(DB_FILE) as conn:
c = conn.cursor()
c.execute("""
SELECT product_name
FROM purchases
WHERE asin = ?
ORDER BY order_date DESC LIMIT 1
""", (asin,))
last_order = c.fetchone()
product_name = last_order[0] if last_order and last_order[0] else None
c.execute("""
INSERT INTO purchases (asin, url, order_date, product_name, status, is_deleted)
VALUES (?, ?, ?, ?, 'pending', 0)
""", (asin, f"https://www.amazon.co.jp/dp/{asin}", order_date, product_name))
conn.commit()
logger.info(f"Pending order recorded for ASIN {asin} at {order_date}, product_name={product_name}")
return RedirectResponse(url=f"https://www.amazon.co.jp/dp/{asin}", status_code=303)
elif action == "check":
return RedirectResponse(url=f"https://www.amazon.co.jp/dp/{asin}", status_code=303)
else:
return RedirectResponse(url="/history", status_code=303)
# 編集
@app.post("/history/update/{id}", response_class=RedirectResponse)
async def update_record(
id: int,
product_name: Optional[str] = Form(None),
delivery_date: Optional[str] = Form(None),
quantity: Optional[str] = Form(None),
unit_price: Optional[str] = Form(None),
is_deleted: Optional[int] = Form(None)
):
logger.info(f"POST /history/update/{id}, data: product_name={product_name}, delivery_date={delivery_date}, quantity={quantity}, unit_price={unit_price}, is_deleted={is_deleted}")
try:
product_name = product_name.strip() if product_name and product_name.strip() else None
delivery_date = delivery_date.strip() if delivery_date and delivery_date.strip() else None
quantity_value = None
if quantity and quantity.strip():
try:
quantity_value = int(quantity)
except ValueError:
logger.warning(f"Invalid quantity for id {id}: {quantity}")
unit_price_value = None
if unit_price and unit_price.strip():
try:
unit_price_value = int(unit_price)
except ValueError:
logger.warning(f"Invalid unit_price for id {id}: {unit_price}")
is_deleted_value = is_deleted if is_deleted is not None else 0
with sqlite3.connect(DB_FILE) as conn:
c = conn.cursor()
c.execute("""
UPDATE purchases
SET product_name = ?, delivery_date = ?, quantity = ?, unit_price = ?, status = 'confirmed', is_deleted = ?
WHERE id = ?
""", (product_name, delivery_date, quantity_value, unit_price_value, is_deleted_value, id))
if c.rowcount == 0:
logger.warning(f"No record updated for id {id}, not found")
conn.commit()
logger.info(f"Record {id} updated")
except Exception as e:
logger.error(f"Error updating record {id}: {str(e)}")
return RedirectResponse(url="/history?error=更新に失敗しました", status_code=303)
return RedirectResponse(url="/history", status_code=303)
if __name__ == "__main__":
import uvicorn
logger.info(f"Starting server on port {PORT}")
uvicorn.run(app, host="0.0.0.0", port=PORT)
amzorder.env
# amzOrder設定
PORT=8000
DB_FILE=purchases.db
LOG_FILE=amzorder.log
history.html
<!DOCTYPE html>
<html>
<head>
<title>amzOrder - 注文履歴</title>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
table { width: 100%; border-collapse: collapse; }
th, td { border: 1px solid #000; padding: 5px; }
.pending { background-color: #f5f5f5; }
.deleted { background-color: #eee; }
.error { margin-bottom: 5px; }
</style>
<script>
function toggleDelete(checkbox, rowId) {
const row = document.getElementById(rowId);
const form = document.getElementById('update-form-' + rowId.split('-')[1]);
if (checkbox.checked) {
row.classList.add('deleted');
form.querySelector('input[name="is_deleted"]').value = 1;
} else {
row.classList.remove('deleted');
form.querySelector('input[name="is_deleted"]').value = 0;
}
}
</script>
</head>
<body>
{% if error_message %}
<p class="error">{{ error_message }}</p>
{% endif %}
<form method="get" action="/history">
<input type="text" name="filter" value="{{ filter }}" placeholder="商品名でフィルタ">
<select name="sort">
<option value="order_date_desc" {% if sort == "order_date_desc" %}selected{% endif %}>注文日時 新しい順</option>
<option value="order_date_asc" {% if sort == "order_date_asc" %}selected{% endif %}>注文日時 古い順</option>
<option value="product_name_asc" {% if sort == "product_name_asc" %}selected{% endif %}>商品名 昇順</option>
<option value="product_name_desc" {% if sort == "product_name_desc" %}selected{% endif %}>商品名 降順</option>
</select>
<button type="submit">適用</button>
</form>
<a href="/history/export">CSVエクスポート</a>
<table>
<thead>
<tr>
<th>行</th>
<th>商品名</th>
<th>URL</th>
<th>注文日時</th>
<th>単価</th>
<th>個数</th>
<th>合計額</th>
<th>到着日</th>
<th>削除</th>
<th>保存</th>
</tr>
</thead>
<tbody>
{% for record in records %}
<tr id="row-{{ record.id }}"
{% if not record.delivery_date %}class="pending"{% endif %}
{% if record.is_deleted %}class="deleted"{% endif %}>
<td>{{ loop.index }}</td>
<td>
<form id="update-form-{{ record.id }}" action="/history/update/{{ record.id }}" method="post">
<input type="text" name="product_name" value="{{ record.product_name }}">
<input type="hidden" name="is_deleted" value="{{ 1 if record.is_deleted else 0 }}">
</td>
<td><a href="{{ record.url }}">{{ record.url | replace("https://www.amazon.co.jp/dp/", "") }}</a></td>
<td>{{ record.order_date }}</td>
<td>
<input type="number" name="unit_price" value="{{ record.unit_price or '' }}" step="1">
</td>
<td>
<input type="number" name="quantity" value="{{ record.quantity or '' }}" step="1">
</td>
<td>{{ record.total_amount }}</td>
<td>
<input type="date" name="delivery_date" value="{{ record.delivery_date or '' }}">
</td>
<td>
<input type="checkbox" class="delete-checkbox"
onchange="toggleDelete(this, 'row-{{ record.id }}')"
{% if record.is_deleted %}checked{% endif %}>
</td>
<td>
<button type="submit" form="update-form-{{ record.id }}">保存</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>
confirm.html
<!DOCTYPE html>
<html>
<head>
<title>amzOrder - 注文確認</title>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body { font-family: sans-serif; }
.error { color: red; }
</style>
</head>
<body>
<h1>注文確認</h1>
<p>商品: {{ product_name }}</p>
<p>ASIN: {{ asin }}</p>
<p>前回注文日: {{ last_order_date }}</p>
{% if is_first_order %}
<p>この商品は初めての注文です。</p>
{% else %}
<p>この商品を再度注文しますか?</p>
{% endif %}
<form action="/order/{{ asin }}" method="post">
<button type="submit" name="action" value="yes">はい</button>
<button type="submit" name="action" value="check">商品ページを確認</button>
<button type="submit" name="action" value="no">いいえ</button>
</form>
</body>
</html>
5. サーバーの起動
以下のコマンドでサーバーを起動します。
uvicorn main:app --host 0.0.0.0 --port 8000
- サーバーはLAN内で
http://192.168.0.2:8000/
(IPアドレスは環境に応じて変更)でアクセス可能。 - 初回起動時にWindows Firewallやブラウザのセキュリティ警告が出る場合があります。
- 常駐させたい場合は、コマンドプロンプトを最小化して実行し続けてください。
6. 再度起動する場合
サーバーを停止した後、再度起動する場合は以下のコマンドを実行します。
cd amzOrder
.\amzOrder\Scripts\activate
uvicorn main:app --host 0.0.0.0 --port 8000
使い方のポイント
スマホ対応:UIはスマホに最適化していませんが、PC・スマホどちらでも動作します。履歴ページは横に長いため、PCでの閲覧がおすすめ。
- 履歴管理:商品名や到着日は任意で入力可能。削除は論理削除(グレーアウト)で、誤削除を防ぎます。
- CSVエクスポート:履歴をCSVで出力可能。ただし、インポート機能は未実装。
- データベースのリセット:初期化したい場合は、サーバーを停止しpurchases.dbを削除してください。
注意点とカスタマイズ
- パフォーマンス:大量のデータでの動作確認はしていません。必要に応じてソースコードを改造してください。
- セキュリティ:LAN内での利用を想定しており、認証機能はありません。外部公開する場合はセキュリティ対策を追加してください。
- カスタマイズの自由度:ソースコードは公開しているので、UIや機能を自由に改良できます。たとえば、商品名の自動取得やスマホ向けUIの最適化などが考えられます。
最後に
このアプリは、NFCタグを使った簡単な注文管理ツールとして実用性を目指しました。興味がある方はぜひ試してみてください!何か改善点やアイデアがあれば、ソースコードをいじって自分好みにカスタマイズしてみてくださいね。それでは、楽しいNFCライフを!
Discussion