🥒

完全自動でGoogle OAuth 2.0認証情報を取得

2024/01/24に公開

※二段階認証有効アカウント非対応

TL;DR

  • InstalledAppFlow.run_local_server()open_browser=False指定でバックグラウンド待機
  • InstalledAppFlow.authorization_url()でクエリにredirect_uriが入るまで再取得
  • undetected_chromedriver使用

コード全容

auto_oauth2.py
import os
import pickle
import shutil
import time
import traceback
from argparse import ArgumentParser, Namespace
from io import BytesIO
from pathlib import Path
from signal import SIGTERM
from tempfile import TemporaryDirectory
from threading import Thread
from urllib.parse import parse_qs, urlparse

from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from selenium.common.exceptions import (
    NoSuchElementException,
    StaleElementReferenceException,
)
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from undetected_chromedriver import Chrome


class AutoOAuth2:
    def __init__(
        self,
        user_data_dir: str,
        driver_executable_path: str,
        browser_executable_path: str,
        headless: bool = False,
    ) -> None:
        options: Options = Options()
        options.add_argument("--headless") if headless else None
        self.chrome: Chrome = Chrome(
            options=options,
            user_data_dir=user_data_dir,
            driver_executable_path=driver_executable_path,
            browser_executable_path=browser_executable_path,
        )

    def auth(
        self,
        credentials: str,
        scopes: list[str],
        email: str,
        password: str,
        port: int,
        token_file: str,
    ) -> bool:
        flow: InstalledAppFlow = InstalledAppFlow.from_client_secrets_file(
            credentials, scopes
        )
        token_bio: BytesIO = BytesIO()
        thread: Thread = Thread(
            target=self.wait_auth, args=(flow, port, token_bio), daemon=True
        )
        thread.start()

        auth_url: str = None
        while thread.is_alive() and not isinstance(auth_url, str):
            current_url: str = flow.authorization_url(port=port)[0]
            res: dict[str, list[str]] = parse_qs(urlparse(current_url).query)
            if "redirect_uri" in res.keys():
                auth_url = current_url
            else:
                time.sleep(1)
        if not isinstance(auth_url, str):
            print("auth failed")
            return False

        self.chrome.get(auth_url)
        print("email")
        email_element: WebElement = WebDriverWait(self.chrome, 15).until(
            EC.element_to_be_clickable((By.NAME, "identifier"))
        )
        email_element.send_keys(email)
        self.chrome.find_element(By.ID, "identifierNext").click()

        print("password")
        password_element: WebElement = WebDriverWait(self.chrome, 15).until(
            EC.element_to_be_clickable((By.NAME, "Passwd"))
        )
        password_element.send_keys(password)
        self.chrome.find_element(By.ID, "passwordNext").click()

        print("approve app")
        while thread.is_alive():
            try:
                result_element: WebElement = self.chrome.find_element(
                    By.TAG_NAME, "pre"
                )
                if (
                    "The authentication flow has completed. You may close this window."
                    in result_element.text
                ):
                    break
            except NoSuchElementException:
                pass

            try:
                WebDriverWait(self.chrome, 15).until(
                    EC.element_to_be_clickable((By.XPATH, '//button[@type="button"]'))
                )
                self.chrome.find_elements(By.XPATH, '//button[@type="button"]')[
                    -1
                ].click()
            except StaleElementReferenceException:
                time.sleep(1)

        while thread.is_alive():
            time.sleep(1)

        token_data: bytes = token_bio.getvalue()
        if 0 == len(token_data):
            print("get token failed")
            return False

        Path(token_file).write_bytes(token_data)
        print("token saved:", token_file)

        return True

    def wait_auth(self, flow: InstalledAppFlow, port: int, token_bio: BytesIO) -> None:
        try:
            credentials: Credentials = flow.run_local_server(
                port=port, open_browser=False
            )
            token_bio.write(pickle.dumps(credentials))
        except Exception:
            traceback.print_exc()

    def quit(self) -> None:
        print("quit")
        os.kill(self.chrome.browser_pid, SIGTERM)


def auto_oauth2(args: Namespace) -> None:
    try:
        with TemporaryDirectory(ignore_cleanup_errors=True) as dname:
            oauth: AutoOAuth2 = AutoOAuth2(
                dname, args.driver, args.chrome, headless=args.headless
            )
            try:
                print(
                    "google oauth:",
                    oauth.auth(
                        args.credentials,
                        args.scopes,
                        os.environ["GOOGLE_EMAIL"],
                        os.environ["GOOGLE_PASSWORD"],
                        args.port,
                        args.output,
                    ),
                )
            finally:
                oauth.quit()
    finally:
        shutil.rmtree(dname) if os.path.exists(dname) else None


if __name__ == "__main__":
    parser: ArgumentParser = ArgumentParser()
    parser.add_argument("driver", help="driver path")
    parser.add_argument("chrome", help="chrome path")
    parser.add_argument("credentials", help="credentials.json path")
    parser.add_argument("-s", "--scopes", nargs="+", required=True, help="scopes")
    parser.add_argument("-p", "--port", type=int, default=50056, help="server port")
    parser.add_argument("-o", "--output", default="token.pickle", help="token file")
    parser.add_argument("-hl", "--headless", action="store_true", help="headless mode")
    auto_oauth2(parser.parse_args())

使用方法

  • GCP Consoleでcredentials.jsonを事前取得(手順割愛)
  • pip install google-auth-oauthlib==1.0.0 undetected-chromedriver==3.5.3
  • 環境変数設定
GOOGLE_EMAIL=アカウント@gmail.com
GOOGLE_PASSWORD=パスワード
  • スクリプト実行
command
python スクリプトファイル.py \
  chromedriverのパス \
  chromeのパス \
  credentials.jsonのパス \
  -s スコープのURL(複数指定可)
  • 成功時、-o指定でのパス(デフォルト:token.pickle)に認証情報が保存される
  • -hをつければヘッドレス起動

説明

下記API仕様通り。

InstalledAppFlow.run_local_server()は何も引数指定しないと、勝手に認証用URLをブラウザで開いてしまうため自動操作はできなくなる。

一方で、オプション引数でopen_browser=Falseを指定することでブラウザを開かずにリダイレクト用のローカルサーバーが立ち上がり、認証待ち状態となる。

この認証待ち処理をバックグラウンドで待機or非同期処理にさせたうえで、その間に認証用URLをブラウザ自動化モジュールでアクセスすれば手動入力なしで認証情報を取得できる。

ただし、InstalledAppFlow.authorization_url()で取得するURLはリダイレクト用URLが立ち上がるまでクエリにそのURLが含まれないので、リダイレクト用URLが含まれるまで再取得を繰り返す必要がある

また、認証用URLもおそらくbot対策が施されているはずなので回避手段としてundetected_chromedriverを使用しているが、二段階認証が有効になっているとそれでも回避できず(?)SMS認証を無限に繰り返すので、本手段では対象外としている。
クリックする要素が適切でない可能性もあるが詳細未確認。

print("approve app")の部分で認証待ち処理が終わるまで最後のボタン要素を無限にクリックするようにしているが、これはデベロッパー向けアプリの場合続行画面で続行リンクの押下がうまくいかなかったため、かなり強引に処理している。
次ページの認証でも最後のボタン要素が続行なので同じ要素指定で進めることができるが、適切ではない。

Discussion