📽️

HLS形式の配信動画をローカルに保存する

2024/06/30に公開

TL;DR

  • Selenium + undetected_chromedriver使用
  • ChromeOptions.set_capability("goog:loggingPrefs", {"performance": "ALL"})を設定し、Chrome.get_log("performance")でブラウザのネットワークログを解析してストリームデータのURLを1件ずつ構築し、requestsで保存
  • ffmpeg-pythonで結合
  • スクリプト
  • 配信ページへのアクセスにログインが必要な場合は先にログインしておく
  • ストリームデータ取得に認証が必要なページ非対応

スクリプト全容

スクリプト

全処理をmain()にべた書き。

hls.py
import json
import os
import time
import traceback
from argparse import ArgumentParser, Namespace
from datetime import datetime
from pathlib import Path
from subprocess import Popen
from tempfile import TemporaryDirectory
from urllib.parse import ParseResult, urlparse

import ffmpeg
import m3u8
import requests
from m3u8 import M3U8, Playlist
from m3u8.model import MalformedPlaylistError
from requests import RequestException
from undetected_chromedriver import Chrome, ChromeOptions


def main() -> None:
    bin_ext: str = ".exe" if os.name == "nt" else ""

    parser: ArgumentParser = ArgumentParser()
    parser.add_argument(
        "-b", "--browser", default=rf".\chrome\chrome{bin_ext}"
    )
    parser.add_argument("-d", "--driver", default=rf".\chromedriver{bin_ext}")
    parser.add_argument("-u", "--user_data", default=r".\userData")
    parser.add_argument("-s", "--save_dir", default=r".\saved")
    parser.add_argument("-v", "--video_ext", default=".ts")
    parser.add_argument("-a", "--audio_ext", default=".aac")
    parser.add_argument("-f", "--ffmpeg_path", default="ffmpeg")

    args: Namespace = parser.parse_args()

    date_for_path: str = datetime.now().strftime("%Y%m%d_%H%M%S")

    save_root_dir: Path = Path(args.save_dir) / date_for_path
    save_dirs: dict[str, Path] = {
        d: save_root_dir / d for d in ["video", "audio"]
    }

    base_urls: dict[str, str] = {key: None for key in save_dirs}
    media_playlists: dict[str, M3U8] = {}

    user_data_dir: Path = Path(args.user_data)
    if not user_data_dir.is_dir():
        user_data_dir.mkdir(parents=True, exist_ok=True)

    options: ChromeOptions = ChromeOptions()
    options.add_argument("--log-level=ALL")
    options.add_argument(f"--user-data-dir={user_data_dir.absolute()}")
    options.set_capability("goog:loggingPrefs", {"performance": "ALL"})

    driver: Chrome = Chrome(
        options=options,
        browser_executable_path=args.browser,
        driver_executable_path=args.driver,
    )

    try:
        user_agent: str = driver.execute_script("return navigator.userAgent;")
        requests_headers: dict[str, str] = {
            "User-Agent": user_agent,
        }
        target_url: str = input("input target page url: ")

        print("get hls start", target_url)
        driver.get(target_url)
        start: float = time.time()
        while (
            0 < len([v for v in base_urls.values() if not isinstance(v, str)])
            and time.time() - start < 60.0
        ):
            perf_logs: list[dict[str, dict]] = driver.get_log("performance")
            for perf_log in perf_logs:
                message: dict[str, dict[str, dict]] = json.loads(
                    perf_log.get("message", "{}")
                ).get("message", {})
                url: str = (
                    message.get("params", {})
                    .get("response", {})
                    .get("url", "")
                )

                if url.endswith(".m3u8"):
                    try:
                        playlist: M3U8 = m3u8.loads(
                            requests.get(url, headers=requests_headers).text
                        )
                        parsed: ParseResult = urlparse(url)
                        base_path: str = "/".join(parsed.path.split("/")[:-1])
                        base_url: str = (
                            f"{parsed.scheme}://{parsed.hostname}{base_path}/"
                        )

                        if 0 < len(playlist.playlists):
                            playlists: list[Playlist] = list(
                                playlist.playlists
                            )
                            hq_playlist: Playlist = playlists[0]
                            for p in playlists[1:]:
                                if (
                                    hq_playlist.stream_info.bandwidth
                                    or 0 < p.stream_info.bandwidth
                                    or 0
                                ):
                                    hq_playlist = p

                            hq_parsed: ParseResult = urlparse(hq_playlist.uri)
                            playlist = m3u8.loads(
                                requests.get(
                                    f"{base_url}{hq_playlist.uri}"
                                    if "" == hq_parsed.scheme
                                    else hq_playlist.uri,
                                    headers=requests_headers,
                                ).text
                            )

                        playlist_files: list[str] = playlist.files
                        media_type: str = None

                        if 0 < len(
                            [
                                p
                                for p in playlist_files
                                if p.endswith(args.video_ext)
                            ]
                        ) and not isinstance(base_urls["video"], str):
                            media_type = "video"
                        elif 0 < len(
                            [
                                p
                                for p in playlist_files
                                if p.endswith(args.audio_ext)
                            ]
                        ) and not isinstance(base_urls["audio"], str):
                            media_type = "audio"

                        if isinstance(media_type, str):
                            base_urls.update({media_type: base_url})
                            media_playlists.update({media_type: playlist})

                    except (RequestException, MalformedPlaylistError):
                        traceback.print_exc()

            time.sleep(1)

        print("get hls end", target_url)

        if 0 == sum([len(pl.files) for pl in media_playlists.values()]):
            print("hls media not found")
            return
    except Exception:
        traceback.print_exc()
    finally:
        driver.quit()

    saved_files: dict[str, list[Path]] = {key: [] for key in base_urls}

    for media_type, base_url in base_urls.items():
        for media in media_playlists.get(media_type, M3U8()).files:
            media_path: Path = save_dirs[media_type] / media
            media_parsed: ParseResult = urlparse(media)
            media_url: str = (
                f"{base_url}{media}" if "" == media_parsed.scheme else media
            )
            try:
                if not media_path.parent.is_dir():
                    media_path.parent.mkdir(parents=True, exist_ok=True)
                print("save start", media_url, media_path)
                media_path.write_bytes(
                    requests.get(media_url, headers=requests_headers).content
                )
                saved_files[media_type].append(media_path.absolute())
                print("save end", media_url, media_path)
            except (RequestException, OSError):
                traceback.print_exc()

    if 0 == sum([len(p) for p in saved_files.values()]):
        print("no saved hls media files")
        return

    try:
        concat_media: Path = (save_root_dir / date_for_path).with_suffix(
            args.video_ext
            if 0 < len(media_playlists.get("video", M3U8()).files)
            else args.audio.ext
        )
        print("concat start", concat_media)
        with TemporaryDirectory() as tmp_dir:
            tmp_path: Path = Path(tmp_dir)
            list_paths: dict[str, Path] = {
                key: tmp_path / f"{key}.txt" for key in media_playlists
            }
            for media_type, list_path in list_paths.items():
                if 0 == len(saved_files[media_type]):
                    continue
                list_path.write_text(
                    "\n".join(
                        [f"file '{p}'" for p in saved_files[media_type]]
                    ),
                    encoding="utf-8",
                )
            proc: Popen = ffmpeg.run_async(
                ffmpeg.output(
                    *(
                        [
                            ffmpeg.input(
                                f"{p}",
                                f="concat",
                                safe=0,
                            )
                            for p in list_paths.values()
                            if p.is_file()
                        ]
                    ),
                    f"{concat_media}",
                    **{
                        c: "copy"
                        for v, c in dict(
                            video="vcodec", audio="acodec"
                        ).items()
                        if v in list_paths
                    },
                    threads=os.cpu_count(),
                ),
                cmd=f"{args.ffmpeg_path}",
                pipe_stdout=True,
                pipe_stderr=True,
                overwrite_output=True,
            )
            for std in [proc.stderr, proc.stdout]:
                for out in std:
                    try:
                        print(out.decode().rstrip("\n"))
                    except UnicodeDecodeError:
                        pass
        print("concat end", concat_media)
    except Exception:
        traceback.print_exc()


if __name__ == "__main__":
    main()

使用方法

1. 事前準備

必要なPythonモジュールを適宜インストールする。

  • ffmpeg-python
  • m3u8
  • requests
  • undetected_chromedriver

2. スクリプト起動

起動時に引数を各種設定して起動。

command
python hls.py

3. 対象ページアクセスのためのログイン (任意)

対象ページによってはHLSデータ取得のためにログインが必要な場合がある。
その際はSeleniumによりChromeが起動した後、手動でログインして事前にHLSデータが取得できる状態にしておく。
※場合によっては対象ページに1回アクセスしておく必要もある。

4. 対象ページURLの指定

スクリプト実行コンソールにinput target page url: と出るので対象ページURLをコンソールへ入力する。※ブラウザのURLバーではない。

4. HLSデータ取得 (自動)

動画データあるいは音声データが見つかったら保存先に順次保存されていく。

5. 結合ファイル保存 (自動)

HLS全データ取得後、保存されたデータをもとに保存先へ結合ファイルが保存される。

動作仕様

引数

  • -b
    Chromeブラウザの実行パス。デフォルトは.\chrome\chrome.exe (Windowsの場合)
  • -d
    Chromedriverの実行パス。デフォルトは.\chromedriver.exe
  • -u
    ブラウザのデータ保存ディレクトリ。デフォルトは.\userData
  • -s
    動画・音声データの保存ディレクトリ。デフォルトは.\saved
  • -v
    動画データの拡張子。デフォルトは.ts
  • -s
    音声データの拡張子。デフォルトは.aac
  • -f
    ffmpegの実行パス。デフォルトはffmpeg

保存先

引数-sで指定したディレクトリ下に実行日時のサブディレクトリを作成し、その中に保存する。
動画データは.\videoに、音声データは.\audioに保存される。
結合したファイルは実行日時のサブディレクトリ直下に実行日時.{-vで指定した拡張子}で保存される。

ストリーム情報取得

Chrome.get_log("performance")でパフォーマンスログを1件ずつ解析する。
動画と音声の両方の情報取得、あるいはタイムアウトになるまで解析し続ける。
タイムアウトは1分

プレイリストの解析

パフォーマンスログからURLを取り出し、末尾が.m3u8となっているURLを解析対象とする。
URLをGETしてm3u8モジュールに読み込ませた後、M3U8.playlistsが空でない場合は複数品質のプレイリストがあると判断し、playlists.StreamInfo.bandwidthの値を比較して最大値のプレイリストを選択しURLを構築・GETして再度m3u8モジュールに読み込ませる。
M3U8.files内に動画の拡張子ファイルが1つ以上ある場合は動画のプレイリストとみなす。
音声の拡張子ファイルが1つ以上ある場合は音声のプレイリストとみなす。

Seleniumの終了

動画と音声の両方の情報取得、あるいはタイムアウトした時点でSeleniumを終了する。
もし動画も音声も取得できなかった場合はスクリプトも終了する。

ストリームデータの保存

プレイリストのURLをベースURLとして、M3U8.files内のファイルからストリームデータのURLを構築し、requestsモジュールで保存先に保存する。
この際、Seleniumで起動したChromeブラウザのUser-Agentをヘッダに渡している。(あまり意味はない)
またCookieなど認証情報は付与していないので、ストリームデータのURLアクセスに認証が必要な場合は未対応

結合ファイルの作成

ffmpeg-pythonを介して、ffmpegで結合する。
スレッド数はos.cpu_count()で取得した値を設定している。(copyの場合は効果ない?)
単純結合しかしていないため音声がずれたり結合部分にノイズが入ることがある。
以下のような警告ログが大量に出る。未調査。

[mpegts @ 00000201742e6600] start time for stream 1 is not set in estimate_timings_from_pts
    Last message repeated 1 times
[mpegts @ 00000201742e68c0] start time for stream 1 is not set in estimate_timings_from_pts
[aac @ 00000201741f6440] Estimating duration from bitrate, this may be inaccurate
[aac @ 00000201741f5940] Estimating duration from bitrate, this may be inaccurate
[aac @ 00000201741f5c00] Estimating duration from bitrate, this may be inaccurate
[mpegts @ 000002017236cd00] Non-monotonic DTS in output stream 0:2; previous: 96799667, current: 96798813; changing to 96799668. This may result in incorrect timestamps in the output file.

ffmpegm3u8形式のURL直接指定をサポートしているのでそれでも良かったが、ffmpegからの直アクセスが必ずしも対応しきれているとも思えなかったので未採用。

参考

Discussion