💮

Webスクレイピング→pandas→SQLiteのデータパイプラインを初めて作った話

に公開

Webスクレイピング→pandas→SQLiteのデータパイプラインを初めて作った話

なぜ作ったか

SQLやデータマネジメントは大学で習ったが、インターンの準備として思い出す必要が出てきたので、SQLの構文を思い出しつつ、Webスクレイピングなるものもやってみた。

パイプラインの全体像

将来的には、Microsoft環境で使うことになるので、全体像はこんな感じ。
(Webサイト)
↓ スクレイピング(Python: requests / BeautifulSoup / Playwright)
(生データ(HTML→DataFrame))
↓ クレンジング・変換(pandas)
(整形済みデータ)
↓ 格納(SQLAlchemy / pyodbc)
(Microsoft Fabric SQL Database)
↓ 分析・可視化
(Power BI / Excel / Notebook)

ステップ1: スクレイピング

静的Webサイト

# 静的なWebページ用
import requests # HTMLを取得
from bs4 import BeautifulSoup # 必要な・特定の情報を取得
import pandas as pd

# 1. ページ取得
url = "http://books.toscrape.com/"
response = requests.get(url)

print(f"ステータスコード: {response.status_code}") # 200なら成功
response.encoding = response.apparent_encoding # 文字化け防止
print(f"文字コード: {response.encoding}")

# よくあるエラー
# status_code: 403 アクセス拒否 → User-Agent ヘッダーをつける

# headers = {
#     "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
# }
# response = requests.get(url, headers=headers)
#
# 文字化け 文字コードのズレ → response.encoding = "utf-8" を明示

# 2. HTMLをパース
# コンピュータが理解しやすいようにHTMLのテキストデータを解析し、
# 構造化されたデータに変換すること
soup = BeautifulSoup(response.text, "lxml")
#print(soup)
# 3. 本のデータを取得
books = []

# 取りたい要素(本のタイトルなど)の上で 右クリック → 「検証」/調査
for article in soup.find_all("article", class_="product_pod"):
    title = article.h3.a["title"]
    price = article.find("p", class_ = "price_color").text
    rating = article.p["class"][1]
    availability = article.find("p", class_ = "instock availability").text.strip() # .strip()は改行や空白を取り除く
    if availability == "In stock":
        availability = "O"
    else:
        availability = "X"

    books.append({
        "title": title,
        "price": price,
        "rating": rating,
        "availability": availability,
    })

# 4. DataFrameにして表示
# Pandasの「DataFrame型」は、
# 表形式のデータ(行と列で構成されるデータ)を扱うための基本的なデータ構造
# PythonでExcelのような表データを扱いたいときに便利
df = pd.DataFrame(books)
print(df)
print(f"\n取得件数: {len(df)}件")

要素の確認のやりかた
① ブラウザで右クリック→検証

② HTMLの構造を目で読む
「どのタグに入ってる?」
「classやidは何?」

③ BeautifulSoupで取り方を考える
タグで取る → soup.find("p")
classで取る → soup.find("p", class_="price_color")
属性を取る → tag["href"] / tag["title"]
テキストを取る → tag.text

BeautifulSoupの主な取り方(早見表)

やりたいこと コード
最初の1個だけ取る soup.find("h2")
全部取る soup.find_all("li")
classで絞る soup.find("p", class_="price")
idで絞る soup.find("div", id="main")
属性の値を取る tag["href"]tag["title"]
テキストを取る tag.text または tag.get_text()
子要素に降りる article.h3.a(チェーンできる)

複数ページの取得

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time

BASE_URL = "http://books.toscrape.com/catalogue/page-{}.html"
all_books = []

for page_num in range(1, 6):
    url = BASE_URL.format(page_num)
    response = requests.get(url)
    response.encoding = response.apparent_encoding
    soup = BeautifulSoup(response.text, "lxml")

    for article in soup.find_all("article", class_="product_pod"):
        title = article.h3.a["title"]
        price = article.find("p", class_="price_color").text
        rating = article.p["class"][1]

        all_books.append(
            {
                "title": title,
                "price": price,
                "rating": rating,
            }
        )

    print(f"ページ {page_num} 完了: 累計 {len(all_books)} 件")
    time.sleep(1)

df = pd.DataFrame(all_books)
df.to_csv("books.csv", index=False)  # index=False 行番号を消す
# CSV: カンマで区切られたデータが行単位で並ぶ形式
print(f"\n合計 {len(df)} 件を books.csv に保存しました")

動的Webサイト

from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
import pandas as pd
import time

with sync_playwright() as p:
    browser = p.chromium.launch(headless=True) # headless=Falseにするとブラウザが目に見えて動く
    page = browser.new_page()

    page.goto("http://books.toscrape.com/")

    # JS描画が終わるまで待つ
    page.wait_for_selector("article.product_pod")

    # 描画後のHTMLをBeautifulSoupに渡す
    html = page.content()
    soup = BeautifulSoup(html, "lxml")

    # あとはBeautifulSoupと同じ
    books = []
    for article in soup.find_all("article", class_="product_pod"):
        title = article.h3.a["title"]
        price = article.find("p", class_="price_color").text
        books.append({"title": title, "price": price})
    
    browser.close()

df = pd.DataFrame(books)
print(df)

BeautifulSoupとの違いはほぼここだけ:

BeautifulSoup Playwright
HTML取得 requests.get(url) page.goto(url)
HTML解析 BeautifulSoup(response.text) BeautifulSoup(page.content())
JS待機 できない wait_for_selector()

ステップ2: SQLiteに保存

import sqlite3
import pandas as pd

# CSVの読み込み
df = pd.read_csv("books.csv")

# 価格の£を除いてfloatに変換(DBに数値として保存するため)
df["price"] = df["price"].str.replace("£", "").astype(float)

print(df.dtypes) # 型を確認
print(df.head(3))

# DB接続(存在しない場合は新規作成)
# conn/con: データベースへの「接続口」(connectionの略)
conn = sqlite3.connect("books.db")

# DataFrameをそのままテーブルに書き込む
df.to_sql(
    name = "books",
    con = conn,
    if_exists = "replace",
    index = False
)

print("保存完了")

# 確認:SQLで読み直す
result = pd.read_sql("SELECT * FROM books LIMIT 5", conn)
print(result)

conn.close() # これを忘れるとDBファイルがロックされたままになる

# # close()を自動でやってくれる
# with sqlite3.connect("books.db") as conn:
#     df.to_sql("books", con=conn, if_exists="replace", index=False)
# # ← withブロックを出ると自動でclose()

conn = sqlite3.connect("books.db")
cursor = conn.cursor() # 実際にSQLを実行する「窓口」

# 評価がThreeの本を価格の高い順に表示
cursor.execute("""
    SELECT title, price, rating
    FROM books
    WHERE rating = "Three"
    ORDER BY price DESC
    LIMIT 5
"""
)

rows = cursor.fetchall() # execute() で検索した結果を全部まとめて取ってくる
for row in rows:
    print(row)

conn.close()

SQL構文説明

SELECT title, price, rating    -- ① 取得するカラムを指定
FROM books                     -- ② どのテーブルから
WHERE rating = 'Three'         -- ③ 絞り込み条件
ORDER BY price DESC            -- ④ 並び替え
LIMIT 5                        -- ⑤ 上位5件だけ

① SELECT

SELECT *              -- 全カラム
SELECT title, price   -- 指定したカラムだけ

② FROM(エクセルでいう、シート名みたいな)

FROM books   -- テーブル名

③ WHERE(絞り込み)

WHERE rating = 'Three'          -- 文字列は''で囲む
WHERE price > 30                -- 数値はそのまま
WHERE price > 30 AND rating = 'Three'  -- AND で複数条件
WHERE rating = 'Three' OR rating = 'Four'  -- OR も使える

④ ORDER BY(並び替え)

ORDER BY price DESC   -- 高い順(降順)
ORDER BY price ASC    -- 安い順(昇順)、ASCは省略可

⑤ LIMIT(件数制限)

LIMIT 5    -- 上位5件だけ返す

実行順序(重要!)

「まずテーブルを決めて、絞り込んで、並べて、件数を切って、最後に列を選ぶ」というイメージ

他によく使うSQL

-- 件数を数える
SELECT COUNT(*) FROM books;

-- 平均・最大・最小
SELECT AVG(price), MAX(price), MIN(price) FROM books;

-- グループ別に集計(評価ごとの平均価格)
SELECT rating, AVG(price), COUNT(*)
FROM books
GROUP BY rating
ORDER BY AVG(price) DESC;

詰まったところ

cursor.fetchall() とは
execute() で検索した結果を全部まとめて取ってくるメソッド。

  • fetchone()1件だけ
  • fetchmany(5)指定した件数
  • fetchall()全件

注意

robots.txtを確認

url/robots.txtにはスクレイピングの許可範囲やルールが示されています。

Disallow: /全面禁止 → スクレイピングしない
Disallow: /cart/そのパスだけ禁止 → 避ける
何も書いてない / Allow: /許可 → OK
Crawl-delay: その秒数をtime.sleep()に使う

流れとしては、
① robots.txt を確認
↓ 404の場合
② サイトの利用規約(Terms of Service)を確認
↓ 商用利用禁止など書いてある場合
③ スクレイピング禁止の可能性 → 避けるか許可を取る

time.sleep()の設定

スクレイピングにおいて time.sleep(1) を入れているのは、サーバー負荷への配慮として必要です。間隔は人間のアクセス頻度などにあわせるとよいらしいです。

Discussion