DeFi領域における各種指標データを定期的にスクレイピング&スクショ保存させる
概要
本記事では、Python(Selenium)とChromeDriverを用いて、特定のWebページからDeFi領域における各種指標データ(APY
, TVL
, プールサイズ
, 数量(Amount)
, Current/Target Weightage
, Utilization
など)をスクレイピングする方法について解説します。特にLazy Loading対策を行い、ページ全体をスクロール後にフルページスクリーンショットを取得するテクニックを紹介します。
スクレイピング対象サイト: https://jup.ag/perps-earn
使用技術
- Python 3.x
- Selenium (Python版)
- ChromeおよびChromeDriver
- chromedriver_binary(
pip install chromedriver-binary
でインストール可能)
実装の流れ
1. ドライバの起動と準備
Seleniumを使ってChromeブラウザを起動します。ChromeOptions
でヘッドレスモードにすることも可能ですが、デバッグ時には画面表示がある方が便利です。
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import chromedriver_binary
options = Options()
# options.add_argument("--headless") # 必要に応じて
driver = webdriver.Chrome(options=options)
2.目的のURLへアクセスし、遅延読み込みへの対策
Lazy Loading(遅延読み込み)がある場合、すぐにデータが表示されないことがあります。そのため、サイトにアクセスしたら数秒待機します。
driver.get("https://jup.ag/perps-earn")
import time
time.sleep(15) # ページがコンテンツを読み込むのを待機
ここで、time.sleep()
は簡易的な対策です。本格的にはWebDriverWait
を利用して特定要素が表示されるまで待機する方法もあります。
3.ページ全文テキストの抽出
Lazy Loading対応サイトでは、要素が動的に変化します。そのため、特定のDOM要素を取得する代わりに、まずはページ全体のテキストを抽出し、その中から正規表現で目的の情報を検索します。
body_element = driver.find_element_by_tag_name("body")
full_text = body_element.text.strip()
full_text
にはページに表示されている文字情報がすべて含まれます。これを元に後述の正規表現で必要な部分を抜き出します。
4.正規表現を用いたデータ抽出
APY
やTVL
、Token Name
などは、特定のフォーマットで表示されています。
以下のような正規表現を用いて取得します。
-
APY
:re.search(r"APY\s*([\d\.]+%)", full_text)
-
TVL
:re.search(r"Total Value Locked\s*\$([\d,\.]+)", full_text)
import re
apy_match = re.search(r"APY\s*([\d\.]+%)", full_text)
apy = float(apy_match.group(1).replace("%","")) if apy_match else None
tvl_match = re.search(r"Total Value Locked\s*\$([\d,\.]+)", full_text)
tvl = float(tvl_match.group(1).replace(",","")) if tvl_match else None
APY
やTVL
は記号(%
や,
)を削除した上でfloat
変換しています。
また、各トークンのデータ行も正規表現で抽出します。
例:(SOL|ETH|WBTC|USDC|USDT)
でトークン名を取得し、その後ろでPool SIze,Amount,Weightage,Utilizationを順番に取得します。
token_pattern = (
r"(SOL|ETH|WBTC|USDC|USDT)" # Token Name
r"[\s\S]*?\$([\d,\.]+)" # Pool Size ($)
r"[\s\S]*?([\d,\.]+ [A-Za-z]+)" # Amount (e.g. "123.45 SOL")
r"[\s\S]*?([\d\.%]+\s*/\s*[\d\.%]+)" # Weightage (e.g. "20% / 30%")
r"[\s\S]*?([\d\.%]+)" # Utilization (e.g. "50%")
)
tokens = re.findall(token_pattern, full_text)
re.findall()
でマッチした全てのトークン行を取得できます。その戻り値(token
)はダブルのリストになり、各トークンごとにデータがまとまっています。
5.抽出データのクレンジング
抽出したデータは文字列のままなので、%
や$
を削除し、数値へ変換します。
また、Wightageは"20% / 30%"のような文字列をCurrent Wightage
とTarget Wight age
に分けます。このようにクレンジングすることによってデータをCSVに出力する際に、セルの値が数値だけになると「N以上」、「N以下」、昇順、降順がしやすくなるという背景です。
structured_tokens = []
for t in tokens:
token_name = t[0]
pool_size_val = float(t[1].replace(",", ""))
amount_str = t[2]
amount_num_str = re.match(r"([\d\.]+)", amount_str)
amount_val = float(amount_num_str.group(1)) if amount_num_str else None
weightage_str = t[3]
weight_parts = [w.strip().replace("%","") for w in weightage_str.split("/")]
current_weight = float(weight_parts[0]) if weight_parts[0] else None
target_weight = float(weight_parts[1]) if weight_parts[1] else None
utilization_str = t[4].replace("%","")
utilization_val = float(utilization_str) if utilization_str else None
structured_tokens.append({
"Token Name": token_name,
"Pool Size ($)": pool_size_val,
"Amount": amount_val,
"Current Weightage": current_weight,
"Target Weightage": target_weight,
"Utilization": utilization_val
})
6.CSVへの書き込み
データをCSVに保存する際には、初回のみヘッダーを書き込み、その後は追記モードでデータを追加します。
import csv
import os
header = ["Date", "APY", "TVL", "Token Name", "Pool Size ($)", "Amount", "Current Weightage", "Target Weightage", "Utilization"]
csv_file = "data.csv"
write_header = not os.path.exists(csv_file)
with open(csv_file, "a+", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if write_header:
writer.writerow(header)
for token in structured_tokens:
writer.writerow([
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
apy,
tvl,
token["Token Name"],
token["Pool Size ($)"],
token["Amount"],
token["Current Weightage"],
token["Target Weightage"],
token["Utilization"]
])
これで継続的にデータを蓄積でき、後に行う分析や可視化に利用できます。
7.フルページスクリーンショットの取得と時刻埋め込み
save_screenshot()
は現在のウィンドウ表示範囲のみをキャプチャするため、ページ全体が表示されるようにウィンドウサイズを拡大しなければなりません。そのためにはまず、ページ全体が読み込まれるまでスクロールします。またそのスクリーンショットがいつのものなのか把握するためにファイル名に時刻を埋め込みます。
previous_height = None
while True:
current_height = driver.execute_script("return document.body.scrollHeight")
if previous_height == current_height:
break
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
previous_height = current_height
これで、Lazy Loadされるコンテンツをすべて読み込みます。その後、ページ全体の幅と高さを取得してウィンドウサイズを変更します。
scroll_width = driver.execute_script("return document.body.scrollWidth")
scroll_height = driver.execute_script("return document.body.scrollHeight")
driver.set_window_size(scroll_width, scroll_height)
最後にスクリーンショットを保存します。
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{date_str}_screenshot.png"
filepath = os.path.join("screenshot", filename)
driver.save_screenshot(filepath)
これでフルページのスクリーンショットが取得できます。また、スクリーンショットの名前が例えば0240101_123000_screenshot.png
といった形式で、そのスクリーンショットが「2024年1月1日12時30分00秒」に取得されたものであることが、一目でわかるようになります。
8.10分毎の定期実行
定期的なデータ取得のため、全ての処理をwhile True
ループで包み、処理が完了したらtime.sleep(600)
で10分待って次のサイクルを開始します。これで10分毎に最新データを取得し、蓄積することが可能になります。
while True:
try:
# 上記の処理(サイトアクセス、データ抽出、CSV追記、スクリーンショット取得)をここで実行
pass
except Exception as e:
print(f"An error occurred: {e}")
finally:
# 次のサイクルまで10分待機
time.sleep(600)
まとめ
- Lazy Loading対応のため、ページロード後一定時間待機や、全コンテンツが読み込まれるまでスクロールを行う。
-
正規表現を用いて
APY
,TVL
,Token Name
,Pool Size
,Amount
,Weghtage
,Utilization
などをテキストから抽出し、数値化。 - CSV保存で初回のみヘッダーを出力し、以降は追記モードで定期的なデータ蓄積を可能にする。
-
フルページスクリーンショットを実現するるため、ウィンドウサイズをページ全体に合わせて変更し、
save_screenshot
を実行。
これらのテクニックを組み合わせることで、動的に更新されるWeb pageからのデータ収集が容易になります。DeFi関連指標を定期的に蓄積、モニタリングする際などに役立ちます。
Discussion