🎥

YouTube Data API+Supabase Edge Functions+pg_cronでYouTubeの任意の検索結果を蓄積する

2024/08/13に公開

やりたいこと

  1. ある検索ワードに対するYouTubeの検索結果を取得して
  2. いい感じにラベリングして
  3. いい感じにソート・フィルタしてフロントで表示する

例)

  1. 「作業用BGM」のYouTube検索結果を取得して
  2. titleやdescriptionを基に「著作権フリー」「{ジャンル}」等のラベリングをして
  3. 「著作権フリー」の動画のみをジャンル毎に表示する

本記事では、1を実現するまでに検討した内容や遭遇したエラーについてまとめます。
※2と3については触れません(1がなんとかなればなんとかなる気がするので)。

YouTubeの検索結果を取得する方法

YouTube Data APIによって実現可能です。
https://developers.google.com/youtube/v3/docs/search/list?hl=ja

ただし、例によって1日に使用できる上限という概念が存在します。

YouTube Data APIの場合、ユニットという単位で割り当てが設定されており、そこからオペレーション毎に異なる割り当てコストを消費する仕組みになっています。そして、デフォルトの割り当ては10,000ユニット/1日、検索リクエストの費用は100ユニット/1回です。
つまり、仮にページ遷移時にAPIリクエストする場合、100人が訪れるだけでAPIの使用上限に達してしまいます。

さらに、YouTube Data APIが1回のリクエストで返却するアイテムの数(maxResults)は最大50件であり、次の50件を取得したい場合はレスポンスに含まれるnextPageTokenを新しいリクエスト時にpageTokenとして渡してあげる必要があります。
当然、このリクエストでも100ユニットの費用を消費します。

さらにさらに、少し前の記事ですが「取得件数が500件を超えるとnextPageTokenが返却されなくなる」という事象も確認されているようです。
https://zenn.dev/jqinglong/articles/1161615fdaa6f6

上記を踏まえた上で、実装方法を検討します。

選択肢1: YouTube Data APIをそのまま使用する方法

上記のリスクを踏まえた上で、YouTube Data APIをそのまま使用する案です。

  • メリット
    • 実装コストが低い(のですぐにリリースでき、反応が悪かった時も捨てやすい)
  • デメリット
    • ユニットの割り当てを消費しきってしまうと機能が破綻する(し、ほぼ確実に発生する)

実装コストが低い点は大きな魅力に感じますが、簡単に見積もっても割り当て使用量を消費しきることはほぼ確実なので実際は選択肢になり得ません。

なお、割り当て自体を増やすことは可能ですが、これは実装コストが低いメリットを消してしまうことになるので今回は考慮しません。

選択肢2: 再検証時間を設ける等してリクエスト回数を抑える

Next.jsにおけるrevalidate等を使用する案です。

  • メリット
    • 実装コストが低い(選択肢1とそこまで変わらない気がする)
    • 割り当ての消費による破綻を回避できる
  • デメリット
    • 再検証タイミングにリクエストしたユーザーの体験は悪くなる
    • 再検証時間の間隔が長いと、反映までにラグが発生してしまう
    • 再検証時間の間隔が短いと、ユニットの割り当てを消費しきって破綻する
    • 検索内容ごとに適切な再検証時間は異なる
    • 検索内容次第だが、毎回のレスポンスにはそこまで差分がない印象

選択肢1とそこまで変わらない実装コストでリクエスト回数を抑えることができるため、「とりえあずリリースして反応を見たい」要求にもある程度耐えることができそうです。

しかし、再検証時間の間隔は長くても短くても課題が残る上に、本来であれば「検索結果件数が多い→人気→更新頻度を上げたい」のに対して、「割り当てを消費する可能性があるので再検証間隔を長くする→更新頻度が下がる」という仕組みにせざるを得ない点がかなり気になります。

検索内容や再検証時間の間隔にもよるものの、差分より重複が多くなるであろう検索結果を毎回取得する点もいまいちです。

そもそもの話として、nextPageTokenの恐らく適切な使い方は所謂ページネーションや「もっと見る」的な用途な気がしていて、無理やり全件取得してラベリング→表示制御する際に毎回リクエストするというのは明らかにベストプラクティスではない気がします...

選択肢3: 検索結果をDBに蓄積する

大人しく検索結果をDBに蓄積する案です。

  • 時間窓を絞って検索する(直近1時間にアップロードされた動画を検索する)
  • ↑を時間窓と同じ実行間隔で定期実行する(1時間に1回リクエストする)

を行えば、効率よくデータを蓄積できる気がします。

  • メリット
    • 選択肢1, 2で挙げた課題を解決できる
    • 時間窓を絞ることで検索結果件数の肥大化を防げる
    • 割り当てのユニットの消費を推測しやすい
  • デメリット
    • 実装コストが少なくとも選択肢1, 2よりは大きい
    • supabaseを使う予定だけど、別にsupabaseも使いたい放題というわけではない
    • 蓄積以降に更新された情報を反映するためには別の仕組みが必要

当初に考えていた実装よりやや話が膨らんでしまう点が気になりますが、選択肢3で進めることにします。

定期実行する方法

Next.js+Vercel+Supabaseで開発する予定なので、

のどちらを使用するか検討しました。

結論としては

  • Vercelは今後使用しない開発を行うかも
  • Supabaseは今後も使用したい
  • DBにINSERTしていく関数や設定になるので、Supabaseの方に置いておく方がまとまりが良さそう

という理由から、今回は後者の構成を使用することにしました。

YouTube Data API+Supabase Edge Functions+pg_cronでYouTubeの任意の検索結果を蓄積する方法

事前準備

YouTube Data APIの有効化とAPI Keyの作成

Google CloudからYouTube Data APIを有効化し、API Keyを作成します。
詳細な手順は既に多くの方がまとめているので省略します。

Supabaseのテーブルの作成

検索内容と期間を設定するsearch_keywordテーブルと、検索結果を蓄積していくsearch_resultテーブルを作成しておきます。

また、search_keywordテーブルにはSELECT, search_resultテーブルにはSELEECTとINSERTのRLS Policyを設定します。

create table
  public.search_keyword (
    id bigint generated by default as identity not null,
    created_at timestamp with time zone not null default now(),
    keyword character varying null,
    starts_at timestamp with time zone null,
    ends_at timestamp with time zone null,
    constraint search_keyword_pkey primary key (id)
  ) tablespace pg_default;
create table
  public.search_result (
    id bigint generated by default as identity not null,
    created_at timestamp with time zone not null default now(),
    item json null,
    search_keyword_id bigint null,
    constraint search_result_pkey primary key (id),
    constraint search_result_search_keyword_id_fkey foreign key (search_keyword_id) references search_keyword (id)
  ) tablespace pg_default;

ローカル開発環境の準備

Supabase Edge Functionsの公式ドキュメントに倣って進めていきます。

プロジェクトを初期化して…

supabase init

Edge functionを作成します。

supabase functions new search-youtube-cron-job

その後、下記のコマンドを実行して、functionsをローカル環境で起動します。

supabase start # start the supabase stack
supabase functions serve # start the Functions watcher

最後にcurlを実行し動作を確認できれば、開発の準備は完了です。

curl --request POST 'http://localhost:54321/functions/v1/search-youtube-cron-job' \
  --header 'Authorization: Bearer SUPABASE_ANON_KEY' \
  --header 'Content-Type: application/json' \
  --data '{ "name":"Functions" }'

定期実行するコードの実装

リポジトリです。
https://github.com/nih4shi/search-youtube-cron-job

1. 検索内容と期間を取得する

search_keywordテーブルに格納されている「どの検索内容を」「いつからいつまで取得する」という情報を取得します。

今回の実装では、定期実行自体はsearch_keywordに格納されている検索期間によらず行い、定期実行タイミングに基づいて決定される時間窓と検索期間が重なる場合のみYouTube Data APIにリクエストします。

※図中では複数の検索を平行して行っていますが、実際はユニットの消費の関係もあるのであまり想定していなかったりします。

 * YouTube search window: start
 * @returns
 */
const getPublishedBefore = (): Date => {
  const now = new Date()
  return new Date(
    now.getFullYear(),
    now.getMonth(),
    now.getDate(),
    now.getHours(),
    0, // minute
    0, // second
    0 // ms
  )
}

/**
 * YouTube search window: end
 * @returns
 */
const getPublishedAfter = (): Date => {
  const now = new Date()
  return new Date(
    now.getFullYear(),
    now.getMonth(),
    now.getDate(),
    now.getHours() - 1,
    0, // minute
    0, // second
    0 // ms
  )
}

/**
 * get for valid searches from supabase table
 * @returns
 */
const fetchSupabaseSearchKeywords = async (): Promise<Array<SupabaseSearchKeywordRecord> | []> => {
  const { data, error } = await supabase
    .from('search_keyword')
    .select('*')
    .lte('starts_at', getPublishedBefore().toISOString())
    .gte('ends_at', getPublishedAfter().toISOString())

  if (!data) return []

  return data
}

Deno.serve(async (req) => {
  try {
    // 1. get search keyword and period for YouTube Data API from supabase
    const resSearchKeywords: Array<SupabaseSearchKeywordRecord> =
      await fetchSupabaseSearchKeywords()
    if (!resSearchKeywords.length) {
      console.log('No search keywords.')
      return new Response()
    }
    ...
 

2. Supabaseの認証を行う

search_resultテーブルは認証済のアカウントのみINSERTを許可するようにしているので、このタイミングで認証を行います。

// 2. supabase authentication
const { data: authUser, error: authError } = await supabase.auth.signInWithPassword({
  email: Deno.env.get('RLS_SUPABASE_EMAIL') ?? '',
  password: Deno.env.get('RLS_SUPABASE_PASSWORD') ?? '',
})
if (authError) throw 'Supabase authentication failed.'

3. YouTube Data APIにリクエストして、検索結果を取得する

公式ドキュメントに記載されている通り、https://www.googleapis.com/youtube/v3/searchにパラメータを付与してリクエストします。

注意点として、時間窓を絞るために使用するpublishedAfter/publishedBeforeはUTCで判断されます。

/**
 * YouTube Data API query parameter
 */
const queryParams: QueryParams = {
  part: 'snippet',
  type: 'video',
  q: '', // search keyword
  publishedBefore: getPublishedBefore().toISOString(),
  publishedAfter: getPublishedAfter().toISOString(),
  maxResults: '50',
  order: 'date',
  pageToken: '',
  key: Deno.env.get('GOOGLE_API_KEY') ?? '',
}

/**
 * search YouTube
 * @param keyword
 * @param nextPageToken
 * @returns
 */
const fetchSearchYouTube = async (
  supabaseSearchKeywordRecord: SupabaseSearchKeywordRecord,
  nextPageToken: string = ''
): Promise<{ id: number; items: ResponseYouTubeSearchList['items'] }> => {
  const paramStr: URLSearchParams = new URLSearchParams({
    ...queryParams,
    q: supabaseSearchKeywordRecord.keyword,
    pageToken: nextPageToken,
  })

  try {
    const fetchUrl: string = `https://www.googleapis.com/youtube/v3/search?${paramStr.toString()}`
    const res = await fetch(fetchUrl)
    const data: ResponseYouTubeSearchList = await res.json()

    let results: ResponseYouTubeSearchList['items'] = data.items

    if (data.nextPageToken) {
      const nextResults = await fetchSearchYouTube(supabaseSearchKeywordRecord, data.nextPageToken)
      if (nextResults.items) results = results.concat(nextResults.items)
    }

    return {
      id: supabaseSearchKeywordRecord.id,
      items: results,
    }
  } catch (error) {
    console.error(error)
  }

  return {
    id: supabaseSearchKeywordRecord.id,
    items: [],
  }
}
// 3. search YouTube
const fetchPromises = resSearchKeywords.map((res) => fetchSearchYouTube(res))
const resYouTubeSearchResults = await Promise.all(fetchPromises)

const insertRecords = resYouTubeSearchResults.flatMap((res) =>
  res.items.map((item) => ({
    item,
    search_keyword_id: res.id,
  }))
)

4. 検索結果をinsert

3の検索結果をsearch_resultテーブルに蓄積します。

// 4. insert
const { data: insertResponse, error } = await supabase
  .from('search_result')
  .insert(insertRecords)
  .select()

Supabase Edge functionsにDeploy

開発が完了したのでdeployします。

supabase functions deploy search-youtube-cron-job

ローカルの環境変数を使用している場合は、productionへの反映も行います。

supabase secrets set --env-file ./functions/.env

正しく反映されたかは、下記コマンドで確認できます。

supabase secrets list

あとはSupabase上でpg_cronのextensionをONにした後、SQL Editerにて定期実行の設定コマンドを叩けば終了です。

https://supabase.com/docs/guides/functions/schedule-functions

select
  cron.schedule(
    'invoke-function-every-minute',
    '0 * * * *',
    $$
    select
      net.http_post(
          url:='https://project-ref.supabase.co/functions/v1/search-youtube',
          headers:='{"Content-Type": "application/json", "Authorization": "Bearer YOUR_ANON_KEY"}'::jsonb,
          body:=concat('{"time": "', now(), '"}')::jsonb
      ) as request_id;
    $$
  );

停止は下記から。

select cron.unschedule('search-youtube');

おわりに

Edge Functionsのローカル環境の設定で予想外に詰まってしまいましたが、ひとまず所望の動作はしていそうなので良かったです。

APIリクエストに制限があるデータの蓄積方法は先駆者がいるはずなので、捨てないことが決まったタイミングで書き直してもいいかもしれません。

Discussion