🤖

WebDriver と Selenium を再入門:歴史と内部動作を理解する(CDP/BiDi)

に公開

はじめに

ブラウザの自動操作を行う場合にWebDriverを使うケースがあります。
WebDriverは例えば「ブラウザの操作を行いウェブドキュメント内のDOM要素を検出・操作をしたい時」の、プラットフォームや言語に依存しないインターフェイスを提供します。

WebDriverの概要図

ブラウザ非依存の WebDriver が提供するインターフェイスを介して、クライアントから送られたコマンドがブラウザ固有のプロトコルに変換され、ブラウザの操作などが行われます。

この記事では、WebDriverの歴史を見返すと共に、それらが裏でどのように動作しているかを振り返ることを目的としています。
この記事は Selenium や CDP などの自動操作の使い方を説明するものではありません。最低限、SeleniumなりPuppeteerなりでブラウザの自動操作を行った読者を対象としています。

WebDriverとSeleniumの歴史

初期のSelenium

Seleniumは2004年ごろから開発されていました。その頃、Seleniumは現在のWebDriverとは別の方法でブラウザの自動化を行っていました。
当時のSelenium1.xではブラウザのプロキシサーバーとしてSeleniumServerを動かし、JavaScriptを挿入することで自動操作の仕組みを作成しました。

SeleniumRC1

Selenium Serverは以下の役割を果たします。

  • プロキシサーバーとして動き、ページにSelenium Coreを埋め込む役割
  • テストコードからのコマンドを受け取る役割
  • Selenium Coreに対してコマンドを受け渡し、その結果を取得する役割

動きとしては以下の通りです。

  1. Selenium Serverはプロキシサーバーとしてブラウザのリクエストを仲介するようにする
  2. Selenium ServerはSelenium Coreのコードをページに埋め込む
  3. テストコード側はSelenium Serverに対してコマンドを送る
  4. Selenium Serverはそのコマンドを保持しておく。
  5. ブラウザ内の Selenium Core が Selenium Server をポーリングし、Selenium Server に新しいコマンドがあればそれを受け取る。
  6. Selenium CoreはJavaScript上でコマンドを処理して、その結果をSelenium Serverに通知する。
  7. Selenium Serverは処理されたコマンドの結果をテストコード側に返す。

このシステム構成をみてわかるとおり、複雑で低速になりがちな仕組みになっています。

参考:

Seleniumと統合前のWebDriver

Selenium1.xと同時期に、別のブラウザ自動化フレームワーク、WebDriverという名前のツールの開発が進められていました。そのWebDriverの初期コードは2007年初頭にリリースされました。
当時のSeleniumは次のような問題を抱えていました。

  • JavaScript ベースの実装によるブラウザ制約
    • IE では JavaScript から <input type="file"> の値を書き換えられない
  • API が巨大化し複雑になった

WebDriverはSeleniumとは異なるアプローチで同じ問題を解決しようとしました。
WebDriverは、ブラウザ内で実行されるJavaScriptアプリケーションではなく、ブラウザを制御するのに最も適切な仕組みを選んで利用する方針を取りました。

初期のWebDriver

初期のWebDriverでは、その「ブラウザを制御するのに最も適切な仕組み」として、ブラウザごとに次のような方式を採用していました。

  • Internet Explorer
    • COMでブラウザを操作するDLLを作成する
    • 上記DLLをJNIを介して操作する
  • Firefox
    • XPCOMでブラウザを操作するアドオンを作成しFirefoxに埋め込む
    • JsonWireProtocolを介してアドオンを操作する

WebDriverのwikiやソースコードを提供していたサイトは閉鎖され、当時のコードやアーキテクチャを確認することが難しくなっています。
http://code.google.com/p/webdriver/

webarchiveのwebdriver wiki(webarchive)や、selenium 2.xの最初期のコードから、その挙動を予測するだけとなります。

参考:

SeleniumとWebDriverの統合とW3Cによる標準化

2009年8月にはSeleniumとWebDriverの統合は始まりました
2009年12月には統合版の最初の公開(alpha1)がリリースされて、その1年半後の2011年7月に安定版がリリースされました。

同じ頃、Browser Testing and Tools Working Group CharterでWeb Driver API の標準仕様を作る動きが開始されました。
初期のドラフトでは、JsonWireProtocolを参考につくられていました。そして2018年にはW3C勧告となりました。
この頃にはJsonWireProtocolとは大きく変更されています。

この標準化により、ブラウザごとにバラバラだった WebDriver 実装のコア部分が標準化され、挙動が大きく揃えられました。もちろん標準化にない機能についてはブラウザ固有の領域がありますし、仕様の解釈違い、歴史的経緯による後方互換のための挙動のズレは存在します。それでも、当初のカオスな状態だった状況は整理されて冒頭の図のような状況になりました。

WebDriverの概要図

この頃には、WebDriverは特定のツールを表す名前ではなく、自動操作をするためのインターフェイスの名前を表すようになりました。

W3C準拠までの苦労

W3Cに準拠しましょうというのは言葉にすると楽ですが、実際には、やれといわれてすぐできるものではありませんでした。

Seleniumが完全にW3C準拠したのはSelenium4で安定版の候補(RC)は2021年になります。
Selenium3xの頃は、ブラウザドライバの応答に合わせて、JsonWireを使用するか、W3C準拠にするかを切り替えて動かしていました。

Selenium3のProtocolHandshake::createSessionでは、W3C準拠のレスポンスでなければ、JsonWireのレスポンスとみなすというような処理をしています。
https://github.com/SeleniumHQ/selenium/blob/selenium-3.150.0/java/client/src/org/openqa/selenium/remote/ProtocolHandshake.java#L124-L129
このような処理の一端から後方互換とW3C準拠の並立の苦労が伺い知ることができるかと思います。

当然、Selenium側だけでなく、ブラウザドライバ側も同じような苦労がありました。
たとえばchromedriverには2016年頃にセッション時開始時のcapabilityフラグによってW3C準拠か従来のJSON Wireに切り替わるオプションが追加されました。このフラグは2019年にW3C準拠がデフォルトの挙動となったとはいえ、まだchromedriverのコードに残ってはいます(w3c_compliantフラグ 2025/11時点)。

WebDriver BiDiの誕生

いままで説明したWebDriverはHTTP 上のリクエスト/レスポンス を前提とした通信モデルでした。
クライアントは「要素をクリックする」「URL に遷移する」といったコマンドを HTTP リクエストとして送り、ブラウザはその結果をレスポンスとして返します。
この方式は通信の主導権は常にクライアント側にあり、ブラウザ側からの任意のタイミングでのイベントプッシュすることができません。
そのため、たとえば次のようなユースケースでは扱いづらさがありました。

  • console.logの発生イベントの監視
  • ネットワークデータの受信監視
  • ダイアログが開かれた時の監視

こうしたニーズに対しては、Chrome DevTools Protocol(CDP)では、WebSocket を使用して実現するようになっていきました。
しかし、CDP はブラウザ固有のプロトコルであり全てのブラウザで使用できるようなものではありませんでした。そうしたギャップを埋めるために2020 年から双方向 WebDriverのプロトコルであるWebDriver BiDiの制定の作業が始まりました。
2022 年時点でChrome/ChromeDriver 106 と Firefox 102 の両方で、WebDriver BiDi 標準のサポートがリリースされています。

現時点でどのブラウザがWebDriver BiDiを対応しているかについては Can I use? 上で確認が可能です。
https://caniuse.com/wf-webdriver-bidi

2025/11時点でSafariは未対応

参考

ブラウザドライバーの変化の流れ

標準化の流れのなかWebDriverのインターフェイスとブラウザ固有のインターフェイスの仲介をおこなうブラウザドライバーもその形を大きく変更していく必要がありました。
たとえば、当初はdllや拡張機能として提供されていた機能は独立した実行可能ファイルとして提供されるようになりました。

実験を通じたWebDriverの裏側

ここではクラシックなWebDriverとWebDriver BiDiの操作を実際のブラウザを使用して行います。
この実験を通してWebDriverの裏で何が起きているかを確認します。

実験で行う操作内容は以下の通りです。

クラシックなWebDriverの操作

  • zennのトップページを開く
  • 検索に「test」と入力してEnterを押す
  • 記事の一覧を標準出力する

WebDriver BiDiの操作

  • Log entry added eventsページを開く
  • 「Click me for console logs」ボタンを押す
  • Consoleに「Hello World!」と表示されていることを確認する

Chrome

ChromeはChrome DevTools Protocol という機能が提供しており、要素の操作やデバッグ機能の多くがこのプロトコル経由で操作が可能になっています。
chromedriverは裏でCDPを実行して自動操作を行なっています。

Chrome DevTools Protocolを直接使用した自動操作の例

まずはCDPを使用してブラウザを自動操作する方法を確認します。
CDPを直接操作する場合は,--remote-debugging-portフラグでデバッグ用のポートを指定してChromeを起動する必要があります。

"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"   \
  --remote-debugging-port=9222  \
  --user-data-dir=/tmp/chrome-devtools-profile  \
  --no-first-run  \
  --remote-allow-origins="*"  \
  --new-window

このポートに対してwebsocketで接続することでChromeの自動操作が可能になります。

クラシックなWebDriverの操作に相当するCDPの例

まずはクラシックなWebDriverに相当する操作を確認します。

CDPを直接操作したサンプル
import sys
import json
import time
import subprocess
import urllib.request

# pip install websocket-client
import websocket

"""
前提条件:
以下のコマンドでchromeを起動済み
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"   \
  --remote-debugging-port=9222  \
  --user-data-dir=/tmp/chrome-devtools-profile  \
  --no-first-run  \
  --remote-allow-origins="*"  \
  --new-window
"""

cdp_socket = None  # type: websocket.WebSocket
cdp_id = 0


def send_cdp_command(id_: int, method: str, params: dict | None):
    """Send-CdpCommand 相当"""
    global cdp_socket
    if params is None:
        params = {}

    payload = {
        "id": id_,
        "method": method,
        "params": params,
    }
    text = json.dumps(payload)
    cdp_socket.send(text)


def receive_cdp_message_for_id(expected_id: int) -> dict:
    """Receive-CdpMessageForId 相当"""
    global cdp_socket

    while True:
        msg = cdp_socket.recv()
        #通知イベント等で壊れた JSON が来るかもしれないので try-catch 相当
        try:
            data = json.loads(msg)
        except json.JSONDecodeError:
            continue

        if isinstance(data, dict) and data.get("id") == expected_id:
            return data
        # id が違う通知 (Page.loadEventFired など) は無視してループ継続


def invoke_cdp(method: str, params: dict | None = None) -> dict:
    """Invoke-Cdp 相当: コマンド送ってレスポンス待つ"""
    global cdp_id
    cdp_id += 1
    id_ = cdp_id
    send_cdp_command(id_, method, params)
    return receive_cdp_message_for_id(id_)


def connect_cdp():
    """Connect-Cdp 相当: /json/list から WebSocket URL を引いて接続"""
    global cdp_socket, cdp_id

    with urllib.request.urlopen("http://localhost:9222/json/list") as resp:
        targets = json.load(resp)

    if not targets:
        raise RuntimeError("CDP target が見つかりませんでした。Chrome が起動しているか確認してください。")

    ws_url = targets[0]["webSocketDebuggerUrl"]
    cdp_socket = websocket.create_connection(ws_url)

    cdp_id = 0


def get_element_center(model: dict) -> tuple[float, float]:
    """Get-ElementCenter 相当: BoxModel から要素中央の座標を計算"""
    content = model.get("content", [])
    xs: list[float] = []
    ys: list[float] = []

    # [x0, y0, x1, y1, ...] を偶数/奇数インデックスで分割
    for i in range(0, len(content), 2):
        xs.append(float(content[i]))
        ys.append(float(content[i + 1]))

    cx = sum(xs) / len(xs) if xs else 0.0
    cy = sum(ys) / len(ys) if ys else 0.0
    return cx, cy


def invoke_cdp_click_by_selector(selector: str):
    """Invoke-CdpClickBySelector 相当: セレクタ指定でクリック"""
    # DOM.getDocument
    doc = invoke_cdp("DOM.getDocument", {})
    root_id = doc["result"]["root"]["nodeId"]

    # DOM.querySelector
    q = invoke_cdp("DOM.querySelector", {
        "nodeId": root_id,
        "selector": selector,
    })
    node_id = q["result"].get("nodeId")
    if not node_id:
        raise RuntimeError(f"Selector '{selector}' に一致する要素がありません。")

    # DOM.getBoxModel
    box = invoke_cdp("DOM.getBoxModel", {"nodeId": node_id})
    model = box["result"]["model"]
    cx, cy = get_element_center(model)

    # mousePressed
    invoke_cdp("Input.dispatchMouseEvent", {
        "type": "mousePressed",
        "x": cx,
        "y": cy,
        "button": "left",
        "clickCount": 1,
    })

    # mouseReleased
    invoke_cdp("Input.dispatchMouseEvent", {
        "type": "mouseReleased",
        "x": cx,
        "y": cy,
        "button": "left",
        "clickCount": 1,
    })


def wait_cdp_element_by_selector(selector: str, timeout_ms: int = 10_000, interval_ms: int = 250):
    """Wait-CdpElementBySelector 相当: セレクタで要素が現れるまで待機"""
    deadline = time.time() + timeout_ms / 1000.0

    while time.time() < deadline:
        doc = invoke_cdp("DOM.getDocument", {})
        root_id = doc["result"]["root"]["nodeId"]

        q = invoke_cdp("DOM.querySelector", {
            "nodeId": root_id,
            "selector": selector,
        })

        if q["result"].get("nodeId"):
            return True

        time.sleep(interval_ms / 1000.0)

    raise TimeoutError(f"Timeout: Selector '{selector}' が見つかりませんでした。")


def send_cdp_text(text: str):
    """Send-CdpText 相当: フォーカスされた要素にテキストを挿入"""
    invoke_cdp("Input.insertText", {"text": text})


def send_cdp_enter():
    """Send-CdpEnter 相当: Enter キー送信"""
    # keyDown
    invoke_cdp("Input.dispatchKeyEvent", {
        "type": "keyDown",
        "key": "Enter",
        "code": "Enter",
        "windowsVirtualKeyCode": 13,
        "nativeVirtualKeyCode": 13,
        "text": "\r",
    })

    # keyUp
    invoke_cdp("Input.dispatchKeyEvent", {
        "type": "keyUp",
        "key": "Enter",
        "code": "Enter",
        "windowsVirtualKeyCode": 13,
        "nativeVirtualKeyCode": 13,
        "text": "\r",
    })


def wait_cdp_text(text: str, timeout_ms: int = 10_000, interval_ms: int = 300):
    """Wait-CdpText 相当: body.innerText に指定テキストが出るまで待つ"""
    deadline = time.time() + timeout_ms / 1000.0

    while time.time() < deadline:
        # document.body.innerText.includes("...") を評価
        expr = f"document.body.innerText.includes({json.dumps(text)})"
        r = invoke_cdp("Runtime.evaluate", {
            "expression": expr,
            "returnByValue": True,
        })

        if r["result"]["result"].get("value") is True:
            print(f"Text '{text}' detected.")
            return True

        time.sleep(interval_ms / 1000.0)

    raise TimeoutError(f"Timeout waiting for text '{text}'")


def main():
    global cdp_socket

    try:

        # CDP 接続
        connect_cdp()

        # STEP 1: navigate https://zenn.dev/
        invoke_cdp("Page.navigate", {"url": "https://zenn.dev/"})

        # 表示を待つ
        wait_cdp_element_by_selector("#header-search path")

        # STEP 2: click 検索アイコン (#header-search)
        invoke_cdp_click_by_selector("#header-search")

        # STEP 3: waitForElement strong (人気のトピック)
        wait_cdp_text("人気のトピック")

        # STEP 4: click 入力フォーム (#input-search-form)
        invoke_cdp_click_by_selector("#input-search-form")

        # STEP 5: change value -> "test"
        send_cdp_text("test")
        send_cdp_enter()

        # STEP 6: 検索結果で article タグを待つ
        wait_cdp_element_by_selector("article")

        # 記事タイトル一覧を JS で取得
        js = """
Array.from(document.querySelectorAll("[class*='ArticleListItem_title']"))
    .map(x => x.innerText)
"""
        r = invoke_cdp("Runtime.evaluate", {
            "expression": js,
            "returnByValue": True,
        })

        titles = r["result"]["result"]["value"]
        if isinstance(titles, list):
            for t in titles:
                print(t)
        else:
            print("結果形式が想定外です:", titles)

    finally:
        # CDP WebSocket を閉じる
        if cdp_socket is not None:
            try:
                cdp_socket.close()
            except Exception:
                pass


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        sys.exit(1)

CDPを使用するには、http://localhost:9222/json/listにアクセスしてwebSocketDebuggerUrlを取得します。
取得したwebSocketDebuggerUrlに対してwebsocketを通じて通信を行います。

def connect_cdp():
    """Connect-Cdp 相当: /json/list から WebSocket URL を引いて接続"""
    global cdp_socket, cdp_id

    with urllib.request.urlopen("http://localhost:9222/json/list") as resp:
        targets = json.load(resp)

    if not targets:
        raise RuntimeError("CDP target が見つかりませんでした。")

    ws_url = targets[0]["webSocketDebuggerUrl"]
    cdp_socket = websocket.create_connection(ws_url)

    cdp_id = 0
    invoke_cdp("Page.enable", {})
    invoke_cdp("DOM.enable", {})

websocketが接続されたのち、次のCDPのコマンドを使用してブラウザ操作を実施します。

WebDriver BiDiの操作に相当するCDPの例

WebDriver Bidiの操作に相当する「コンソールログの出力をキャプチャする」という操作の場合は、そのキャプチャ結果の通知が非同期に行われるため前述のサンプルのような同期的な実装をすることができません。
ここでは、受信処理をおこなうスレッドを作成しておき、CDPからの受信をキューにたくわけるような修正を行なってブラウザの操作とコンソールログの出力キャプチャを実現しています。

CDPを直接操作したコンソールログのキャプチャ・サンプル
import json
import threading
import time
import queue

import requests
import websocket

CDP_HTTP = "http://localhost:9222"
TARGET_URL = "https://www.selenium.dev/selenium/web/bidi/logEntryAdded.html"


class CDPClient:
    def __init__(self, ws_url: str):
        self.ws = websocket.create_connection(ws_url)
        self._id = 0
        self._pending: dict[int, queue.Queue] = {}
        self._event_handlers: dict[str, list] = {}
        self._lock = threading.Lock()
        self._receiver_thread = threading.Thread(
            target=self._recv_loop, daemon=True
        )
        self._receiver_thread.start()

    def _next_id(self) -> int:
        with self._lock:
            self._id += 1
            return self._id

    def send(self, method: str, params: dict | None = None) -> dict:
        """
        CDP メソッドを送信し、同期的に result を待つ。
        """
        msg_id = self._next_id()
        payload = {"id": msg_id, "method": method}
        if params is not None:
            payload["params"] = params
        q_res: queue.Queue = queue.Queue()
        self._pending[msg_id] = q_res
        self.ws.send(json.dumps(payload))
        result = q_res.get()  # result が入るまで待機
        return result

    def add_event_listener(self, method: str, handler):
        """
        CDP イベントを購読する。
        """
        self._event_handlers.setdefault(method, []).append(handler)

    def _recv_loop(self):
        while True:
            try:
                raw = self.ws.recv()
            except websocket.WebSocketConnectionClosedException:
                break
            msg = json.loads(raw)

            # レスポンス
            if "id" in msg:
                q_res = self._pending.pop(msg["id"], None)
                if q_res is not None:
                    q_res.put(msg.get("result", {}))
                    continue

            # イベント
            method = msg.get("method")
            if method and method in self._event_handlers:
                params = msg.get("params", {})
                for h in list(self._event_handlers[method]):
                    try:
                        h(params)
                    except Exception:
                        pass

    def close(self):
        self.ws.close()


def get_ws_url() -> str:
    tabs = requests.get(f"{CDP_HTTP}/json/list").json()
    # すでに開いているタブがあればそれを使う。なければ /json/new で作るなど。
    if not tabs:
        new_tab = requests.get(f"{CDP_HTTP}/json/new").json()
        return new_tab["webSocketDebuggerUrl"]
    return tabs[0]["webSocketDebuggerUrl"]


def click_element_by_selector(client: CDPClient, css_selector: str):
    """
    DOM.querySelector + DOM.getBoxModel + Input.dispatchMouseEvent で
    セレクタにマッチする要素を実際にクリックする。
    """
    # DOM ツリー有効化
    client.send("DOM.enable")

    # ドキュメントの root node
    doc = client.send("DOM.getDocument")
    root_node_id = doc["root"]["nodeId"]

    # セレクタで要素を取得
    res = client.send(
        "DOM.querySelector",
        {"nodeId": root_node_id, "selector": css_selector},
    )
    node_id = res.get("nodeId")
    if not node_id:
        raise RuntimeError(f"Element not found: {css_selector}")

    # 要素の box model を取得
    box = client.send("DOM.getBoxModel", {"nodeId": node_id})
    model = box.get("model")
    if not model or "content" not in model:
        raise RuntimeError("No box model for element")

    # content は [x1, y1, x2, y2, x3, y3, x4, y4]
    content = model["content"]
    xs = content[0::2]
    ys = content[1::2]
    x = sum(xs) / len(xs)
    y = sum(ys) / len(ys)

    # マウス移動 → 押下 → 離す でクリック
    client.send(
        "Input.dispatchMouseEvent",
        {"type": "mouseMoved", "x": x, "y": y, "button": "none"},
    )
    client.send(
        "Input.dispatchMouseEvent",
        {
            "type": "mousePressed",
            "x": x,
            "y": y,
            "button": "left",
            "clickCount": 1,
        },
    )
    client.send(
        "Input.dispatchMouseEvent",
        {
            "type": "mouseReleased",
            "x": x,
            "y": y,
            "button": "left",
            "clickCount": 1,
        },
    )

def wait_for_element_by_selector(
    client: CDPClient,
    selector: str,
    timeout: float = 10.0,
    interval: float = 0.25,
) -> int:
    """
    DOM.getDocument + DOM.querySelector を使って、
    CSS セレクタにマッチする要素が現れるまで待つ。

    見つかったら nodeId を返し、見つからなければ TimeoutError を投げる。
    """
    deadline = time.monotonic() + timeout

    # DOM は一度 enable しておけば OK(呼び出し側でやっているなら省略可)
    client.send("DOM.enable")

    while time.monotonic() < deadline:
        doc = client.send("DOM.getDocument")
        root_id = doc["root"]["nodeId"]

        res = client.send(
            "DOM.querySelector",
            {"nodeId": root_id, "selector": selector},
        )
        node_id = res.get("nodeId")
        if node_id:
            return node_id

        time.sleep(interval)

    raise TimeoutError(f"Timeout: selector '{selector}' が見つかりませんでした。")

def main():
    ws_url = get_ws_url()
    client = CDPClient(ws_url)

    # Page / Runtime 有効化
    client.send("Page.enable")
    client.send("Runtime.enable")

    # console.log イベントを拾う
    log_entries: list[str] = []
    done = {"flag": False}

    def on_console(params: dict):
        args = params.get("args", [])
        texts = []
        for arg in args:
            # CDP の consoleAPICalled の引数
            if "value" in arg:
                texts.append(str(arg["value"]))
            elif "description" in arg:
                texts.append(str(arg["description"]))
        msg = " ".join(texts)
        log_entries.append(msg)
        print("console.log:", msg)
        done["flag"] = True

    client.add_event_listener("Runtime.consoleAPICalled", on_console)

    # 対象ページに遷移
    client.send("Page.navigate", {"url": TARGET_URL})
    
    # 待機
    wait_for_element_by_selector(client, "#consoleLog")

    # ★ここで CDP でちゃんとクリックする(Runtime.evaluate 不使用)
    click_element_by_selector(client, "#consoleLog")

    # ログが来るまで最大 5 秒待つ
    for _ in range(50):
        if done["flag"]:
            break
        time.sleep(0.1)

    client.close()


if __name__ == "__main__":
    main()

コンソールログのキャプチャを行う際には以下のCDPコマンドを実行しておく必要があります。

これにより、CDPからの受信データにRuntime.consoleAPICalledイベントが含まれるようになります。

chromedriverを直接操作

次にchromedriverを使用したクラシックなWebDriverとWebDriver BiDiの確認を行います。
chromedriverは裏でCDPを操作して、WebDriverのインターフェイスでブラウザを操作します。
この実験を行う場合、事前にchromedriverを起動する必要があります。

chromedriver --port=64921

chromedriverのログを詳しくみたい場合は--log-level--log-pathを指定することで詳細のログがファイルに出力されるようになります。

chromedriver --port=64921 --log-level=ALL --log-path=./chromedriver.log
chromedriverによるクラシックなWebDriverの操作

chromedriverを起動するとW3C WebDriverで定義されているエンドポイントが使用できるようになります。
このエンドポイントにアクセスすると、そのエンドポイントに応じた操作をCDPを用いて行います。

たとえばクリック操作はWebDriverのElement ClickコマンドがCDPのdispatchMouseEventコマンドで再現されていることがchromedriverのコードから読み取れます。

1.WebDriverのElement ClickがExecuteClickElementで実行されることが確認できます。

chromedriver/server/http_handler.cc

      CommandMapping(kPost, "session/:sessionId/element/:id/click",
                     WrapToCommand("ClickElement",
                                   base::BindRepeating(&ExecuteClickElement))),

2.ExecuteClickElementでCDPのdispatchMouseEventを使用してマウスの移動、ボタンを押す、ボタンを離すの一連の動作を再現していることが確認できます。

chromedriver/element_commands.cc

Status ExecuteClickElement(Session* session,
                           WebView* web_view,
                           const std::string& element_id,
                           const base::Value::Dict& params,
                           std::unique_ptr<base::Value>* value) {
// 略
  std::vector<MouseEvent> events;
  events.emplace_back(kMovedMouseEventType, kNoneMouseButton,
                      relative_location.x, relative_location.y,
                      session->sticky_modifiers, 0, 0);
  events.emplace_back(kPressedMouseEventType, kLeftMouseButton,
                      relative_location.x, relative_location.y,
                      session->sticky_modifiers, 0, 1);
  events.emplace_back(kReleasedMouseEventType, kLeftMouseButton,
                      relative_location.x, relative_location.y,
                      session->sticky_modifiers, 1, 1);
  status = containing_web_view->DispatchMouseEvents(
      events, session->GetCurrentFrameId(), false);
  // kTargetDetached could be a side effect of the click.
  if (status.IsOk() || status.code() == kTargetDetached) {
    session->mouse_position = absolute_location;
  }
  return status;
}

ではchromedriverを操作したブラウザの操作例のサンプルを確認してみます。

chromedriver直接操作によるChromeのWebDriver(クラシック)の操作サンプル
from __future__ import annotations

import time
import requests

# 事前に chromedriver --port=64921 などで起動しておく
DRIVER_URL = "http://localhost:64921"


# ---- WebDriver HTTP ラッパ ----

def new_session() -> str:
    resp = requests.post(
        f"{DRIVER_URL}/session",
        json={
            "capabilities": {
                "firstMatch": [
                    {
                        "browserName": "chrome",
                        "goog:chromeOptions": {
                            "binary": "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
                            "args": [],
                        }
                    }
                ],
            }
        },
    )
    resp.raise_for_status()
    data = resp.json()
    print("post session", data)
    return data["value"]["sessionId"]  # ChromeDriver はここに入れて返す


def navigate(session_id: str, url: str) -> None:
    resp = requests.post(
        f"{DRIVER_URL}/session/{session_id}/url",
        json={"url": url},
    )
    resp.raise_for_status()


def _extract_element_id(elem_obj: dict) -> str:
    # W3C WebDriver の element key
    return elem_obj["element-6066-11e4-a52e-4f735466cecf"]


def find_element(session_id: str, using: str, value: str) -> str:
    resp = requests.post(
        f"{DRIVER_URL}/session/{session_id}/element",
        json={"using": using, "value": value},
    )
    data = resp.json()

    # 失敗時は {"value": {"error": "no such element", ...}} の形式
    if "error" in data.get("value", {}):
        raise RuntimeError(data["value"])

    elem = data["value"]
    return _extract_element_id(elem)


def find_elements(session_id: str, using: str, value: str) -> list[str]:
    resp = requests.post(
        f"{DRIVER_URL}/session/{session_id}/elements",
        json={"using": using, "value": value},
    )
    data = resp.json()
    if "error" in data.get("value", {}):
        raise RuntimeError(data["value"])

    elems = data["value"]
    return [_extract_element_id(e) for e in elems]


def click(session_id: str, element_id: str) -> None:
    resp = requests.post(
        f"{DRIVER_URL}/session/{session_id}/element/{element_id}/click",
        json={},
    )
    resp.raise_for_status()


def get_text(session_id: str, element_id: str) -> str:
    resp = requests.get(
        f"{DRIVER_URL}/session/{session_id}/element/{element_id}/text"
    )
    resp.raise_for_status()
    return resp.json()["value"]


def is_displayed(session_id: str, element_id: str) -> bool:
    # GET /element/{id}/displayed
    resp = requests.get(
        f"{DRIVER_URL}/session/{session_id}/element/{element_id}/displayed"
    )
    resp.raise_for_status()
    return bool(resp.json()["value"])


def get_active_element(session_id: str) -> str:
    # GET /element/active
    resp = requests.get(f"{DRIVER_URL}/session/{session_id}/element/active")
    resp.raise_for_status()
    data = resp.json()
    if "error" in data.get("value", {}):
        raise RuntimeError(data["value"])
    return _extract_element_id(data["value"])


def send_keys(session_id: str, element_id: str, text: str) -> None:
    # W3C: Element Send Keys
    resp = requests.post(
        f"{DRIVER_URL}/session/{session_id}/element/{element_id}/value",
        json={"text": text},
    )
    resp.raise_for_status()


# ---- WebDriverWait 相当の「待ち」 ----

def wait_for_presence(session_id: str, using: str, value: str, timeout: float = 10.0) -> str:
    """
    Selenium: presence_of_element_located 相当
    """
    end = time.time() + timeout
    last_err: Exception | None = None

    while time.time() < end:
        try:
            return find_element(session_id, using, value)
        except Exception as e:  # no such element 等
            last_err = e
            time.sleep(0.25)  # WebDriverWait の poll_frequency 的なもの

    raise TimeoutError(f"presence timeout: {using}={value}") from last_err


def wait_for_visibility(session_id: str, using: str, value: str, timeout: float = 10.0) -> str:
    """
    Selenium: visibility_of_element_located 相当
    (presence を取ったあと GET /displayed で可視かどうかを見る)
    """
    end = time.time() + timeout
    last_err: Exception | None = None

    while time.time() < end:
        try:
            elem_id = find_element(session_id, using, value)
            if is_displayed(session_id, elem_id):
                return elem_id
        except Exception as e:
            last_err = e

        time.sleep(0.25)

    raise TimeoutError(f"visibility timeout: {using}={value}") from last_err


def wait_for_clickable(session_id: str, using: str, value: str, timeout: float = 10.0) -> str:
    """
    Selenium: element_to_be_clickable 相当
    単純化して「visibility + displayed」で代用。
    本家も「visible & enabled & click が投げられる」みたいな条件。
    """
    # ここでは visibility チェックだけを行う(enabled 判定などを足してもよい)
    return wait_for_visibility(session_id, using, value, timeout=timeout)


# ---- メイン処理:元の Selenium コードの HTTP 版 ----

def main() -> None:
    session_id = new_session()
    try:
        # STEP 1: navigate https://zenn.dev/
        navigate(session_id, "https://zenn.dev/")

        # STEP 2: 検索アイコン (#header-search) が出るまで待ってクリック
        header_search_id = wait_for_clickable(
            session_id, "css selector", "#header-search", timeout=10.0
        )
        click(session_id, header_search_id)

        # STEP 3: "人気のトピック" が表示されるまで待つ
        _popular_topic_id = wait_for_visibility(
            session_id,
            "xpath",
            "//*[text()='人気のトピック']",
            timeout=10.0,
        )

        # STEP 4: 入力フォーム (#input-search-form) をクリック
        input_form_id = wait_for_clickable(
            session_id,
            "css selector",
            "#input-search-form",
            timeout=10.0,
        )
        click(session_id, input_form_id)

        # STEP 5: "test" と入力して Enter
        #   Selenium 版と同様に「アクティブ要素」に対して送る
        active_id = get_active_element(session_id)
        # Enter は WebDriver の特殊キーで '\uE007'
        send_keys(session_id, active_id, "test\uE007")

        # STEP 6: 検索結果で article タグを待つ
        _any_article_id = wait_for_presence(
            session_id,
            "css selector",
            "article",
            timeout=10.0,
        )

        # 記事タイトル一覧を取得
        title_ids = find_elements(
            session_id, "css selector", "[class*='ArticleListItem_title']"
        )

        titles: list[str] = []
        for eid in title_ids:
            text = get_text(session_id, eid).strip()
            if text:
                titles.append(text)

        if titles:
            for t in titles:
                print(t)
        else:
            print("記事タイトルが取得できませんでした。")

    finally:
        # セッション終了
        requests.delete(f"{DRIVER_URL}/session/{session_id}")


if __name__ == "__main__":
    main()

やっていることとしてはエンドポイントをHTTPで叩いているだけです。
HTTP経由でアクセスした裏でCDPのコマンドが動作して、ブラウザの自動操作を実現することになります。

chromedriverによるWebDriver BiDiの操作

chromedriverはWebDriver BiDiのサポートもしていますが、その実現方法はクラシックなものより複雑になっています。

chromedriverはchromium-bidiというJavaScriptのライブラリを使用してCDPとBiDi間を変換します。
chromedriverはchromium-bidiをコンパイルしたmapper.jsRuntime.evaluateコマンドを使用して埋め込みます。
mapper.jsは以下のグローバル関数をwindowオブジェクトに追加します。

  • onBidiMessage
  • window.sendBidiResponse
    • mapper.jsで実行することでRuntime.bindingCalledを介してchromedriverに通知される
      chromedriverは上記のJavaScriptの関数と連携してCDPとBiDiを変換を実現することになります。

以下のサンプルはchromedriverでWebDriver BiDiを使用するサンプルになります。

chromedriver直接操作によるChromeのWebDriver BiDiの操作サンプル
import requests
import json
import websocket
import time
import threading


class WebSocket:
    def __init__(self, ws_url: str, on_console=None):
        # タイムアウトをつけておくと recv が永遠に固まりにくい
        self.ws = websocket.create_connection(ws_url, timeout=10)
        self._stop_event = threading.Event()
        self._on_console = on_console

        self._receiver_thread = threading.Thread(
            target=self._recv_loop,
            daemon=True,
        )
        self._receiver_thread.start()

    def _recv_loop(self):
        # log.entryAdded を購読
        msg_id = 1
        self.ws.send(json.dumps({
            "id": msg_id,
            "method": "session.subscribe",
            "params": {
                "events": ["log.entryAdded"],
                # 特定のタブに限定したい場合:
                # "contexts": [context_id],
            }
        }))
        print(">> subscribe log.entryAdded")

        while not self._stop_event.is_set():
            try:
                raw = self.ws.recv()
            except websocket.WebSocketConnectionClosedException:
                # close() 済みなどで切断されたら終了
                print("WebSocket closed, exiting recv loop")
                break
            except Exception as e:
                # 簡易エラー処理でよいという前提なのでログだけ
                print(f"WebSocket recv error: {e}")
                break

            if not raw:
                continue

            try:
                msg = json.loads(raw)
            except json.JSONDecodeError:
                print(f"Invalid JSON: {raw!r}")
                continue

            # イベントは "method" フィールドを持つ
            if msg.get("method") == "log.entryAdded":
                p = msg.get("params", {})

                # コールバックがあればそちらに渡す
                if self._on_console is not None:
                    try:
                        self._on_console(p)
                    except Exception as e:
                        # コールバック側の例外は潰しておく
                        print(f"on_console handler error: {e}")
                else:
                    # コールバック未指定なら標準出力に出すだけ
                    log_type = p.get("type")        # 'console' / 'javascript' など
                    level = p.get("level")          # 'info', 'error', ...
                    method = p.get("method")        # 'log', 'error', 'warn', ...
                    text = p.get("text")            # 'Hello, world!'
                    args = p.get("args", [])        # console.log の引数配列

                    print(
                        f"[log.entryAdded] "
                        f"type={log_type} level={level} "
                        f"method={method} text={text!r} args={args!r}"
                    )

    def close(self):
        # まずループ側に「終われ」と伝える
        self._stop_event.set()
        try:
            self.ws.close()
        except Exception:
            pass
        # 念のためスレッド終了を待つ(デーモンスレッドだが綺麗に閉じる)
        if self._receiver_thread.is_alive():
            self._receiver_thread.join(timeout=2)


CD_PORT = 64921


log_entries = []


def on_console(params: dict):
    args = params.get("args", [])
    texts = []
    for arg in args:
        # BiDi の console 引数
        if "value" in arg:
            texts.append(str(arg["value"]))
        elif "description" in arg:
            texts.append(str(arg["description"]))
    msg = " ".join(texts) or params.get("text", "")
    log_entries.append(msg)
    print(f"[on_console] {msg!r}")

session_url = f"http://127.0.0.1:{CD_PORT}/session"
resp = requests.post(session_url, json={
    "capabilities": {
        "firstMatch": [
            {
                "browserName": "chrome",
                "goog:chromeOptions": {
                    "binary": "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
                    "args": [],
                },
                "webSocketUrl": True,
            }
        ]
    }
})
resp.raise_for_status()
data = resp.json()
print("session create response:", data)

cap = data["value"]["capabilities"]
ws_url = cap["webSocketUrl"]   # 例: ws://localhost:28501/session/xxxx
session_id = data["value"]["sessionId"]
base = f"http://127.0.0.1:{CD_PORT}/session/{session_id}"

ws = None
try:
    ws = WebSocket(ws_url, on_console=on_console)
    print("session_id:", session_id)

    # 現在のタブのURLを変更
    r = requests.post(
        f"{base}/url",
        json={"url": "https://www.selenium.dev/selenium/web/bidi/logEntryAdded.html"}
    )
    print("url", r.status_code, r.text)

    # 要素検索(CSSセレクタ)
    r = requests.post(
        f"{base}/element",
        json={
            "using": "css selector",
            "value": "#consoleLog"
        }
    )
    print("element:", r.status_code, r.json())
    res_value = r.json()["value"]
    elem_id = list(res_value.keys())[0]
    elem_value = res_value[elem_id]
    print("element id:", elem_id, elem_value)

    # クリック
    r = requests.post(
        f"{base}/element/{elem_value}/click",
        json={}
    )
    print("click:", r.status_code, r.text)

    # ログが飛んでくるのを少し待つ
    time.sleep(1.0)
    print("log_entries:", log_entries)

finally:
    # 終了処理:WebSocket → セッション削除の順で
    if ws is not None:
        ws.close()
    try:
        requests.delete(f"{base}")
    except Exception as e:
        print(f"delete session error: {e}")

WebDriver BiDiを使用する時は、セッション作成時のcapabilitiesにwebSocketUrlを付与します。

session_url = f"http://127.0.0.1:{CD_PORT}/session"
resp = requests.post(session_url, json={
    "capabilities": {
        "firstMatch": [
            {
                "browserName": "chrome",
                "goog:chromeOptions": {
                    "binary": "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
                    "args": [],
                },
                "webSocketUrl": True,
            }
        ]
    }
})

以下のレスポンスでwebSocketUrlが取得されます。

{
  "value": {
    "capabilities": {
      // 略
      "webSocketUrl": "ws://127.0.0.1:64921/session/3cd3d268e56207260cf4017302853c83",
      // 略
    },
    "sessionId": "3cd3d268e56207260cf4017302853c83"
  }

取得したwebSocketUrlに対してwebsocketで接続後、session.subscribeを送信してコンソールログのイベントを受信するようにします。

        self.ws.send(json.dumps({
            "id": msg_id,
            "method": "session.subscribe",
            "params": {
                "events": ["log.entryAdded"],
                # 特定のタブに限定したい場合:
                # "contexts": [context_id],
            }
        }))

その後、log.entryAddedのメッセージ受信をスレッド上のループで監視します。

        while not self._stop_event.is_set():
            try:
                raw = self.ws.recv()
# 略
            # イベントは "method" フィールドを持つ
            if msg.get("method") == "log.entryAdded":
                p = msg.get("params", {})

                # コールバックがあればそちらに渡す

Seleniumでchrome操作の実験

CDPやchromedriverの操作を今まで行いましたが、seleniumはその操作を隠蔽してブラウザの操作をシンプルにしてくれます。

クラシックなWebDriverの操作

以下はseleniumのサンプルになります。実装がシンプルになっていることが確認できるかと思います。

SeleniumによるChromeのWebDriver(クラシック)の操作サンプル
from __future__ import annotations

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC


def main() -> None:
    options = Options()
    options.binary_location = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
    driver = webdriver.Chrome(options=options)

    wait = WebDriverWait(driver, 10)

    try:
        # STEP 1: navigate https://zenn.dev/
        # Playwright の page.goto(..., wait_until="networkidle") 相当
        driver.get("https://zenn.dev/")

        # STEP 2: 検索アイコン (#header-search) が出るまで待ってクリック
        search_icon = wait.until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "#header-search"))
        )
        search_icon.click()

        # STEP 3: "人気のトピック" が表示されるまで待つ
        wait.until(
            EC.visibility_of_element_located(
                (By.XPATH, "//*[text()='人気のトピック']")
            )
        )

        # STEP 4: 入力フォーム (#input-search-form) をクリック
        input_form = wait.until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "#input-search-form"))
        )
        input_form.click()

        # STEP 5: "test" と入力して Enter
        #   実際のフォーカスは内部の <input> にいるはずなので、
        #   active_element に送るほうが素直
        active = driver.switch_to.active_element
        active.send_keys("test", Keys.ENTER)

        # STEP 6: 検索結果で article タグを待つ
        wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "article")))

        # 記事タイトル一覧を取得
        #   Playwright 版と同じセレクタ [class*='ArticleListItem_title']
        title_elems = driver.find_elements(By.CSS_SELECTOR, "[class*='ArticleListItem_title']")
        titles = [e.text for e in title_elems if e.text.strip()]

        if titles:
            for t in titles:
                print(t)
        else:
            print("記事タイトルが取得できませんでした。")

    finally:
        driver.quit()


if __name__ == "__main__":
    main()

WebDriver BiDiの操作

SeleniumでWebDriver BiDiを使用する場合はドライバーのオプションの「enable_bidi」をつける必要があります。

SeleniumによるChromeのWebDriver BiDiの操作サンプル
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait

# BiDi を有効化
options = Options()
options.enable_bidi = True  # = options.set_capability("webSocketUrl", True) と同等の指定

driver = webdriver.Chrome(options=options)

try:
    driver.get("https://www.selenium.dev/selenium/web/bidi/logEntryAdded.html")

    log_entries = []

    # ★ポイント2: BiDi の script ドメイン経由でハンドラ登録
    #   以後、console.log などが発生すると log_entries.append(...) が呼ばれる
    driver.script.add_console_message_handler(log_entries.append)
    print("driver.script", driver.script)
    print("driver.script.conn.execute", driver.script.conn.execute)
    print("driver.script.conn.url", driver.script.conn.url)

    # ページ上のボタンをクリックすると console.log('Hello, world!') が実行される
    driver.find_element(By.ID, "consoleLog").click()

    # ログが届くまで待つ
    WebDriverWait(driver, 5).until(lambda _: log_entries)

    # BiDi で受け取ったログを確認
    for entry in log_entries:
        print("console.log:", entry.text)
finally:
    driver.quit()

Firefox

Firefoxは2つの自動操作の仕組みを持っています。

Marionette は geckodriver によって利用され、クラシックなWebDriverの動作をサポートします。
一方、FirefoxのWebDriver BiDiはブラウザドライバーを介さずにwebsocketで通信を行い操作が可能です。(注1)

また、一時期、Firefox で CDP が使用できましたが、2024年末にCDPのサポートは終了のアナウンスがありました。

(注1): WebDriver BiDiによる操作自体はwebsocketのアドレスを知っていれば可能ですが、通常はgeckodriver経由で、セッション確立する際にwebsocketのアドレスを取得する必要があります。

Marionette経由のFirefoxの操作例

Marionetteはgeckodriver経由で使用されるケースが多いですが、直接使用することも可能です。
以下のように--marionette--marionette-portを指定してFirefoxを起動します。

/Users/ユーザー名/.cache/selenium/firefox/mac64/145.0.1/Firefox.app/Contents/MacOS/firefox --marionette --marionette-port 2828

marionette-driverライブラリを使用することでFirefoxのmarionetteポート経由で直接操作が可能です。

marionetteでのFirefox操作例
from marionette_driver.marionette import Marionette
from marionette_driver import By, keys
from marionette_driver.errors import NoSuchElementException
import time

def wait_element_by_selector(
    client: Marionette,
    selector: str,
    timeout_ms: int = 10_000,
    interval_ms: int = 250,
):
    """
    CDP の wait_cdp_element_by_selector と同等:
    CSS セレクタで要素が見つかるまで待って WebElement を返す
    """
    deadline = time.time() + timeout_ms / 1000.0

    while time.time() < deadline:
        try:
            el = client.find_element(By.CSS_SELECTOR, selector)
            return el
        except NoSuchElementException:
            time.sleep(interval_ms / 1000.0)

    raise TimeoutError(f"Timeout: Selector '{selector}' が見つかりませんでした。")

def wait_text(
    client: Marionette,
    text: str,
    timeout_ms: int = 10_000,
    interval_ms: int = 300,
):
    """
    CDP の wait_cdp_text と同等:
    document.body.innerText に指定文字列が現れるまで待機
    """
    deadline = time.time() + timeout_ms / 1000.0

    while time.time() < deadline:
        found = client.execute_script(
            "return document.body && document.body.innerText.includes(arguments[0]);",
            script_args=[text],
        )
        if found:
            print(f"Text '{text}' detected.")
            return True

        time.sleep(interval_ms / 1000.0)

    raise TimeoutError(f"Timeout waiting for text '{text}'")

def main():
    # ★ Firefox 本体の Marionette に直で接続(geckodriver なし)
    m = Marionette(host="127.0.0.1", port=2828)
    m.start_session()
    print(m.client)

    # ページ遷移
    m.navigate("https://zenn.dev/")

    # 表示をまつ
    wait_element_by_selector(m, "#header-search path")

    # 検索ボタンを押す
    btn = m.find_element("id", "header-search")
    btn.click()

    # 人気のトピック が表示されるまでまつ
    wait_text(m, "人気のトピック")

    # キーワード入力を押す
    btn = m.find_element("id", "input-search-form")
    btn.click()

    # 入力フォームを取得
    input_el = m.find_element("id", "input-search-form")

    # "test" 入力 → Enter
    input_el.send_keys("test")
    input_el.send_keys(keys.Keys.ENTER)

    # <article>が表示されるまで待機
    wait_element_by_selector(m, "article")

    # 記事タイトル一覧を JS で取得
    js = """
return (Array.from(document.querySelectorAll("[class*='ArticleListItem_title']"))
    .map(x => x.innerText));
"""
    titles = m.execute_script(js)
    if isinstance(titles, list):
        for t in titles:
            print(t)
    else:
        print("結果形式が想定外です:", titles)

if __name__ == "__main__":
    main()

WebDriver BiDiを経由のFirefoxの操作

Firefoxの自動操作でWebDriver BiDiを使用する場合には--remote-debugging-portでポートを指定する必要があります。

例:

/Users/ユーザー/.cache/selenium/firefox/mac64/145.0.1/Firefox.app/Contents/MacOS/firefox --remote-debugging-port 51328 --remote-allow-hosts localhost -foreground -no-remote

上記のコマンドでFirefoxを起動した場合、ws://127.0.0.1:51328/sessionにwebsocketで接続することでWebDriver BiDiを使用した操作が可能になります。

WebDriver BiDiでのFirefox操作例
import json
import threading
import time
import queue

import websocket  # pip install websocket-client


BIDI_WS_URL = "ws://127.0.0.1:51328/session"
TARGET_URL = "https://www.selenium.dev/selenium/web/bidi/logEntryAdded.html"


class BiDiClient:
    def __init__(self, ws_url: str):
        # ★ Origin ヘッダを送ると Firefox Remote Agent に拒否されるので suppress_origin=True
        self.ws = websocket.create_connection(ws_url, suppress_origin=True)
        self._id = 0
        self._lock = threading.Lock()
        self._pending: dict[int, queue.Queue] = {}
        self._handlers: dict[str, list] = {}

        self._receiver_thread = threading.Thread(
            target=self._recv_loop,
            daemon=True,
        )
        self._receiver_thread.start()

    def _next_id(self) -> int:
        with self._lock:
            self._id += 1
            return self._id

    def send_command(self, method: str, params: dict | None = None) -> dict:
        msg_id = self._next_id()
        payload = {"id": msg_id, "method": method}
        if params is not None:
            payload["params"] = params

        q: queue.Queue = queue.Queue()
        self._pending[msg_id] = q
        self.ws.send(json.dumps(payload))
        # Firefox 側のレスポンス (success/error) 丸ごと受け取る
        res = q.get()
        return res

    def add_event_listener(self, method: str, handler):
        self._handlers.setdefault(method, []).append(handler)

    def _recv_loop(self):
        while True:
            try:
                raw = self.ws.recv()
            except websocket.WebSocketConnectionClosedException:
                break
            if not raw:
                print('.....exit')
                return
            msg = json.loads(raw)

            # コマンド応答
            if "id" in msg:
                q = self._pending.pop(msg["id"], None)
                if q is not None:
                    # Firefox の BiDi 返信: { "type": "success", "id":..., "result": {...} }
                    q.put(msg.get("result", msg.get("error", {})))
                continue

            # イベント: { "type": "event", "method": "...", "params": {...} }
            m = msg.get("method")
            if m and m in self._handlers:
                params = msg.get("params", {})
                for h in list(self._handlers[m]):
                    try:
                        h(params)
                    except Exception:
                        pass

    def close(self):
        try:
            self.ws.close()
        except Exception:
            pass


def find_context_id(bidi: BiDiClient) -> str:
    res = bidi.send_command("browsingContext.getTree", {"maxDepth": 1})
    ctxs = res.get("contexts", [])
    if not ctxs:
        raise RuntimeError("No browsingContext found")
    return ctxs[0]["context"]


def navigate(bidi: BiDiClient, context_id: str, url: str):
    bidi.send_command("browsingContext.navigate", {
        "context": context_id,
        "url": url,
        "wait": "complete",
    })


def locate_node_by_css(bidi: BiDiClient, context_id: str, css: str) -> dict:
    res = bidi.send_command("browsingContext.locateNodes", {
        "context": context_id,
        "locator": {"type": "css", "value": css},
        "maxNodeCount": 1,
        "serializationOptions": {
            "maxDomDepth": 0,
            "maxObjectDepth": 0,
            "includeShadowTree": "none",
        },
    })
    nodes = res.get("nodes", [])
    if not nodes:
        raise RuntimeError(f"Element not found: {css}")
    return nodes[0]


def node_to_element_origin(node_remote: dict) -> dict:
    shared_ref: dict = {}
    if node_remote.get("sharedId") is not None:
        shared_ref["sharedId"] = node_remote["sharedId"]
    elif node_remote.get("handle") is not None:
        shared_ref["handle"] = node_remote["handle"]
    else:
        raise RuntimeError("NodeRemoteValue has neither sharedId nor handle")

    return {
        "type": "element",
        "element": shared_ref,
    }


def click_element(bidi: BiDiClient, context_id: str, element_origin: dict):
    bidi.send_command("input.performActions", {
        "context": context_id,
        "actions": [
            {
                "type": "pointer",
                "id": "mouse",
                "parameters": {"pointerType": "mouse"},
                "actions": [
                    {
                        "type": "pointerMove",
                        "duration": 0,
                        "origin": element_origin,
                        "x": 0,
                        "y": 0,
                    },
                    {"type": "pointerDown", "button": 0},
                    {"type": "pointerUp", "button": 0},
                ],
            }
        ],
    })


def main():
    bidi = BiDiClient(BIDI_WS_URL)

    try:
        # 1. BiDi セッション開始
        session_res = bidi.send_command("session.new", {
            "capabilities": {"alwaysMatch": {}}
        })
        print("session.new:", session_res)

        # 2. console.log を購読
        bidi.send_command("session.subscribe", {
            "events": ["log.entryAdded"],
        })

        log_entries: list[str] = []
        done = {"flag": False}

        def on_log(params: dict):
            entry = params.get("entry", params)
            text = entry.get("text")
            log_entries.append(text)
            print("[log.entryAdded]", text)
            done["flag"] = True

        bidi.add_event_listener("log.entryAdded", on_log)

        # 3. 既存タブの browsingContext を1つ拾う
        context_id = find_context_id(bidi)
        print("context:", context_id)

        # 4. ページ遷移
        navigate(bidi, context_id, TARGET_URL)

        # 5. 要素 locate → click(script.evaluate は使わない)
        node = locate_node_by_css(bidi, context_id, "#consoleLog")
        origin = node_to_element_origin(node)
        click_element(bidi, context_id, origin)

        # 6. console.log が飛んでくるのを少し待つ
        for _ in range(50):
            if done["flag"]:
                break
            time.sleep(0.1)

        print("log_entries:", log_entries)

        # 一応セッションを閉じるなら
        bidi.send_command("session.end", {})

    finally:
        bidi.close()


if __name__ == "__main__":
    main()

geckodriverを経由のFirefoxの操作例

Marionetteを直接操作せずに、geckodriverを使用することでWebDriverとMarionette間の変換を行いFirefoxを動作させることができます。
まずgeckodriverを事前に起動しておきます。

/usr/local/bin/geckodriver --port 4444

gekodriverを使用する場合、chromedriverで行ったクラシックなWebDriverの操作サンプルの流用可能です。
接続先情報とcapabilitiesの設定を変更することでFirefoxの自動操作になります。

DRIVER_URL = "http://localhost:4444"


# ---- WebDriver HTTP ラッパ ----

def new_session() -> str:
    resp = requests.post(
        f"{DRIVER_URL}/session",
        json={
            "capabilities": {
                "firstMatch": [
                    {
                        "browserName": "firefox",
                        #"goog:chromeOptions": {
                        #    "binary": "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
                        #    "args": [],
                        #}
                        "moz:firefoxOptions": {
                            # ★ Firefox 本体のパスを明示
                            "binary": "/Users/ユーザー/.cache/selenium/firefox/mac64/145.0.1/Firefox.app/Contents/MacOS/firefox",
                            "args": [],
                        },
                    }
                ]
            }
        },
    )

geckodriverからWebDriver BiDiを使用する場合はセッション作成時のcapabilitiesにwebSocketUrlフラグを付与します。

def new_session() -> str:
    resp = requests.post(
        f"{DRIVER_URL}/session",
        json={
            "capabilities": {
                "firstMatch": [
                    {
                        "browserName": "firefox",
                        "moz:firefoxOptions": {
                            # ★ Firefox 本体のパスを明示
                            "binary": "/Users/ユーザ/.cache/selenium/firefox/mac64/145.0.1/Firefox.app/Contents/MacOS/firefox",
                            "args": [],
                        },
                        "webSocketUrl": True,
                    },
                ]
            }
        },
    )

このレスポンスにwebSocketUrlがあるので、WebDriver BiDiを経由のFirefoxの操作で使用したコードのBIDI_WS_URLを置き換えて実行すればWebDriver BiDiを経由した操作が可能になります。

この時の geckodriver の役割は、Firefox を起動して WebDriver セッションを確立し、WebDriver BiDi 用 WebSocket の URL を取得してクライアントに伝えることです。
実際の websocket 通信は、クライアントと Firefox の間で行われます。

Seleniumのケース

Chromeで使用していたコードを流用可能です。
FirefoxをSeleniumで使用するケースではwebdriver.Firefoxに置き換えてください。

options = Options()
options.enable_bidi = True
driver = webdriver.Firefox(options=options)

他ツールのブラウザ自動化方法

ここでは他のブラウザ自動化ツールがどのようにしてブラウザを自動操作しているかを簡単に確認してみます。

  • Puppeteer
    • JavaScriptライブラリ
    • Chrome
      • デフォルトはCDP / オプションでWebDriver BiDi
    • Firefox
      • WebDriver BiDi
  • PlayWright
    • JavaScript。Python, .NET, Javaでも使用できる。
    • ブラウザにパッチを当ててテストできるようにしている
      • ChromeはCDPのみ対応できているらしく、ブラウザを改変するパッチはない
    • WebDriver BiDiをサポートしようとはしているが課題がのこっている
  • Cypress
    • NodeJsで作成したデスクトップアプリからブラウザを起動して操作する
    • ブラウザ起動時にプロキシを経由で接続する。これにより、ブラウザにCypressの処理を埋め込む
    • ブラウザ側とCypressのアプリ側はWebSocketで通信してテストを行う
    • 参考1, 参考2

これらのツールではWebDriver自体は使っていませんが、ブラウザドライバーが裏でやっていることや、初期のSeleniumのようなJavaScriptの埋め込み方法を使用して実現していることがわかるかと思います。

まとめ

今回はWebDriverとSeleniumの歴史の確認と、それが裏側でどのように動作しているかを確認しました。
Seleniumで簡単にできていることの裏側で、なにが起きているかの一端が確認できたかと思います。
これを知ることで今後新しいブラウザの自動操作ツールがでてきても、裏でなにが起きているかを想像することは容易になるかと思います。

Discussion