😎

今日からはじめるアニメ監視

2024/01/07に公開

TL;DR

  • Redditのアニメチャンネルで日次で盛り上がったアニメを監視します
  • Cloud Run Jobsを日次で起動させます
  • 以下のようにGoogleSheetに日次でアニメのスレッドが書き込まれていきます
    01-google-sheet

はじめに

どうも、アニメマスターです。

皆さんアニメは好きですか?
私は三度の飯より好きです!

アニメ好きを長いことやっているとアニメを観るだけでは飽き足らず、アニメの感想を共有したり、クール内でどのアニメが人気なのかを監視したくなります。
ということで、本稿ではRedditという世界最大の掲示板アプリのアニメチャンネルの盛り上がりを日次監視していきます。

概観

今回作成するGCPの構成図は以下になります。
02-architecture

PythonイメージをArtifact Registryに格納して、
Cloud Runのコンテナを日次でスケジュール実行し、GoogleSheetに書き込みます。

ディレクトリ構成は以下のようになります。

.
├── Dockerfile
├── Makefile
├── README.md
├── apis
│   ├── gcp
│   │   ├── credentials.json
│   │   └── spreadsheet.py
│   └── reddit
│       ├── config.py
│       └── reddit_client.py
├── infra # 詳細は後述します
├── main.py
├── poetry.lock
├── pyproject.toml

1. 下準備

1.1 Poetryを用いたディレクトリの構築

まずはPoetryを使って、ディレクトリを構築していきます。

$ mkdir reddit-monitoring
$ cd reddit-monitoring
$ poetry init

これでreddit-monitoring/配下にpyproject.tomlが作成されました。ではライブラリをインストールしていきましょう。

$ poetry add praw gspread oauth2client pandas

prawはReddit APIのラッパーで、gspreadはGoogleSheet API用のライブラリです。

1-2. GCP Projectでのサービスアカウントの作成

次にGoogleSheetに書き込むためのサービスアカウントをGCP Projectから作成します。これは後ほど実際にGoogleSheetにデータを書き込む際のメールアドレスとして利用します。
GCPコンソールから以下を参考に作成ください。
サービスアカウント

1-3. Reddit APIトークンの発行

次にReddit APIをOAuth2で利用できるようにセットアップしていきます。
以下を参考にCLIENT_ID, CLIENT_SECRET, USER_AGENTを取得してください。
reddit-archive/reddit

取得できたらrefresh_tokenを取得します。このトークンを元にAPIでデータを取得できるようになります。

refresh_tokenを取得するPythonファイル
auth.py
import praw
from apis.reddit.config import CLIENT_ID, CLIENT_SECRET, USER_AGENT

# Initialize the Reddit API client with your client ID, client secret, and user agent.
reddit = praw.Reddit(
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    user_agent=USER_AGENT,
    redirect_uri='http://example.com',
)

# Generate the URL for user authentication
auth_url = reddit.auth.url(
    scopes=['identity', 'read'],
    state='UniqueState',
    duration='permanent',
)

# Print the URL and guide the user to visit it
print(f"Visit this URL to authorize the application: {auth_url}")

# Retrieve the authorization code from the redirected URL
authorization_code = input("Enter the authorization code from the URL: ")

# Use the authorization code to obtain an access token
access_token = reddit.auth.authorize(authorization_code)
print(f"Access Token: {access_token}")
print(f"Refresh Token: {reddit.auth.scopes}")

このファイルを直接実行するとWebページのリンクが取得でき、リダイレクト先のURLからリフレッシュトークンが取得できるのでそれを保存しておきます。

$ python3 auth.py

2. Pythonファイルを作成する

2-1. apis/配下のファイルの作成

まずは外部APIを呼び出すモジュールを作っていきます。
apis/reddit/というフォルダを作ってreddit_client.pyファイルにRedditの投稿を取得するクライアントを作成します。
このファイルはapis/reddit/config.pyから認証情報をインポートしているので、各種認証情報をconfig.pyに設定しておいてください。

Redditの投稿を取得するクライアント
reddit_client.py
from typing import Union
from datetime import datetime
import praw
from apis.reddit.config import CLIENT_ID, CLIENT_SECRET, USER_AGENT, REFRESH_TOKEN


class RedditClient:
    # refresh token: expires 1 year after the last use and can be used indefinitely to generate new hour-long access tokens.
    def __init__(self):
        self.reddit = praw.Reddit(
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET,
            user_agent=USER_AGENT,
            refresh_token=REFRESH_TOKEN,
        )

    def search_posts(
        self,
        subreddit_name: str,
        query: str,
        sort: str,
        limit: int,
        time_filter: Union[str, str, str, str, str, str] = ['hour', 'day', 'week', 'month', 'year', 'all'],
    ) -> praw.models.listing.generator.ListingGenerator:
        # search endpoint
        # https://www.reddit.com/dev/api/#GET_search
        # https://praw.readthedocs.io/en/stable/code_overview/models/subreddit.html
        subreddit = self.reddit.subreddit(subreddit_name)
        try:
            generator = subreddit.search(
                query=query,
                sort=sort,
                limit=limit,
                time_filter=time_filter,
            )
        except praw.exceptions.APIException as e:
            print(f"Error: {e}")
        return generator

    def retrieve_posts(
        self,
        generator: praw.models.listing.generator.ListingGenerator,
    ) -> None:
        # [[timestamp, title, score, url], ... ]
        item_2d_array = []
        try:
            while True:
                item = next(generator)
                if 'rewatch' in item.url:
                    continue
                datetime_obj = datetime.fromtimestamp(item.created_utc)
                formatted_date = datetime_obj.strftime('%Y-%m-%d %H:%M:%S')
                timestamp = datetime.now().strftime('%Y%m%d')
                item_2d_array.append([timestamp, item.title, item.score, item.url])
        except StopIteration as e:
            # StopIteration is raised when the iterator is exhausted
            print(e)
        return item_2d_array

次にapis/gcp/というディレクトリを作成してspreadsheet.pyを作成します。
このファイルにGoogleSheetにデータを書き込む処理を記述していきます。
また、書き込む際に1-2で作成したサービスアカウントの認証情報が必要なので、credentials.jsonとしてapis/gcp/credentials.jsonに格納しておいてください。
また、これかの認証情報はGitHub等のPublicなリポジトリには格納しないようご注意ください。

スプレッドシート用クライアント
spreadsheet.py
# https://docs.gspread.org/en/latest/user-guide.html
from typing import List
import gspread
from oauth2client.service_account import ServiceAccountCredentials


class SpreadSheetClient:
    # Set up the credentials
    SCOPE = [
        "https://spreadsheets.google.com/feeds",
        "https://www.googleapis.com/auth/spreadsheets",
        "https://www.googleapis.com/auth/drive.file",
        "https://www.googleapis.com/auth/drive",
    ]

    def __init__(self, credential_path: str):
        creds = ServiceAccountCredentials.from_json_keyfile_name(
            credential_path, self.SCOPE
        )
        self.client = gspread.authorize(creds)

    def get_worksheet(self, spreadsheet_id: str, worksheet_name: str):
        ss = self.client.open_by_key(spreadsheet_id)
        wks = ss.worksheet(worksheet_name)
        return wks

    def write_rows(self, wks, array_of_2d: List[List[str]], start_row: int=2):
        # row: List[str]
        for i, row in enumerate(array_of_2d, start=start_row):
            wks.insert_row(row, i)

2-2. エントリーポイントの作成

次にエントリーポイントであるmain.pyを作成します。このファイルから各種クライアントを呼び出し、実際にデータをGoogleSheetに書き込んでいきます。

エントリーポイント
main.py
import pandas as pd
from typing import List, Dict, Union
from datetime import datetime
from apis.reddit.reddit_client import RedditClient
from apis.gcp.spreadsheet import SpreadSheetClient


if __name__ == '__main__':
    reddit_client = RedditClient()
    spreadsheet_client = SpreadSheetClient('./apis/gcp/credentials.json')
    this_year = str(datetime.now().year)

    params = {
        'subreddit_name': 'anime',
        'query': 'Episode discussion',
        'sort': 'new',
        'limit': 100,
        'time_filter': 'day',
        'sheet_id': YOUR_SHEET_ID, # GoogleSheet IDに変更してください
    }
    subreddit_name = params['subreddit_name']
    query = params['query']
    sort = params['sort']
    limit = params['limit']
    time_filter = params['time_filter']
    generator = self.reddit_client.search_posts(subreddit_name, query, sort, limit, time_filter)
    rows = self.reddit_client.retrieve_posts(generator)
    columns = ['timestamp', 'title', 'score', 'url']
    matrix = pd.DataFrame(rows, columns=columns)
    if datetime.now().month in [1, 2, 3]:
        matrix['season'] = 'winter'
    elif datetime.now().month in [4, 5, 6]:
        matrix['season'] = 'spring'
    elif datetime.now().month in [7, 8, 9]:
        matrix['season'] = 'summer'
    elif datetime.now().month in [10, 11, 12]:
        matrix['season'] = 'fall'
    sorted_df = matrix.sort_values(by='score', ascending=False)

    # write rows to google spreadsheet
    wks = spreadsheet_client.get_worksheet(params['sheet_id'], this_year)
    spreadsheet_client.write_rows(wks, sorted_df.values.tolist())

あとは書き込むGoogleSheetの共有設定でサービスアカウントのメールアドレスに編集権限を設定してあげてエントリーポイントを実行して、データが書きこまれていたら成功です。

$ poetry run python3 main.py

3. Docker化する

では、Cloud Runで実行できるようにコンテナ化していきましょう。
まず、Makefileでコマンドを実行できるようにしていきます。

環境変数のところは適宜ご自身のリソースのIDに変換してください。

Makefile
Makefile
PROJECT_ID=PROJECT_ID
REPOSITORY_ID=REPOSITORY_ID
ARTIFACT_REGISTRY=ARTIFACT_REGISTRY
IMAGE_NAME=IMAGE_NAME
TAG_NAME=$(ARTIFACT_REGISTRY)/$(PROJECT_ID)/$(REPOSITORY_ID)/$(IMAGE_NAME):latest

run:
	poetry run python main.py

# https://cloud.google.com/artifact-registry/docs/docker/pushing-and-pulling
build_and_push:
	docker build -t $(TAG_NAME) .
	docker push $(TAG_NAME)

次にDockerfileを作成します。

Dockerfile
# Use a base image with Python
FROM python:3.11-slim

ENV TZ=Asia/Tokyo

# Install dependencies
RUN apt-get update && \
  apt-get install -y make && \
  pip install poetry

# Set the working directory
WORKDIR /app

# Copy the poetry.lock and pyproject.toml files to the container
COPY pyproject.toml poetry.lock ./

# Create virtualenv at .venv in the project instead of ~/.cache/
# Install production dependencies 
RUN poetry config virtualenvs.in-project true && \
  poetry install --no-dev --no-ansi

# Copy the rest of your application code to the container
COPY . .

# Define the command to run your application
CMD ["make", "run"]

これでDockerイメージを作成し、GCPのArtifact Registryにpushする準備が整いました。

4. GCPリソースをTerraformで作成する

今回必要になるArtifact RegistryCloud RunTerraformを利用して作成していきます。
リソースは作成できさえすれば良いのでTerraformを使わずにコンソールから作成してもらっても構いません。

Terraformで作成する場合は以下のようなディレクトリになります。
以下の構成をreddit-monitoring/infra/配下に作ってもらえればと思います。

.
├── credentials
│   └── credentials.json
├── main.tf
├── modules
│   ├── ar
│   │   ├── main.tf
│   │   └── variables.tf
│   ├── cr
│   │   ├── main.tf
│   │   └── variables.tf
├── terraform.tfstate
├── terraform.tfstate.backup
└── variables.tf

4-1. Terraform用サービスアカウントの作成

1-2の要領でTerraform用のサービスアカウントを作成します。
このサービスアカウントに必要なリソースの権限を付与していきます。

サービスアカウントが作成できたらJSONのキーを取得し、credentials/credentials.jsonに格納しておいてください。
認証情報なのでPublicに露出しないように注意するのも忘れないようにしてください。

4-2. Terraformファイルの作成

まずはTerrafrom Projectを作成していきます。
Terraform CLIをインストールしていない人はこちらからインストールしてください。

$ cd infra
$ terraform init

次にmodules/配下にリソースを作成していきます。

$ mkdir modules
$ mkdir modules/ar
$ mkdir modules/cr

ではArtifact Registryから作成していきます。

Artifact Registryのファイル群
modules/ar/main.tf
# https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/artifact_registry_repository
resource "google_artifact_registry_repository" "reddit-monitoring" {
  project       = var.project
  location      = var.region
  repository_id = "reddit-monitoring"
  description   = "invoke reddit monitoring container"
  format        = "DOCKER"
}
modules/ar/variables.tf
variable "project" {
  description = "A name of a GCP project"
  type        = string
  default     = YOUR_PROJECT_NAME # 変更してください
}

variable "region" {
  description = "A GCP region"
  type        = string
  default     = "asia-northeast1"
}

次にCloud Runのリソースを作成していきます。
スケジューラは毎日6時に起動するようにしていますが、お好みで変更ください。

Cloud Runのファイル群
modules/cr/main.tf
provider "google-beta" {
  region = var.region
}

# Project data
# [START cloudrun_jobs_execute_jobs_on_schedule_parent_tag]
data "google_project" "project" {
}

# https://github.com/terraform-google-modules/terraform-docs-samples/blob/main/run/jobs_create/main.tf
resource "google_cloud_run_v2_job" "reddit-monitoring" {
  provider     = google-beta
  name         = "reddit-monitoring"
  project      = var.project
  location     = var.region
  launch_stage = "BETA"
  template {
    template {
      containers {
        image = var.repository_path
      }
    }
  }
}

# https://github.com/terraform-google-modules/terraform-docs-samples/blob/main/run/jobs_execute_jobs_on_schedule/main.tf
resource "google_cloud_scheduler_job" "reddit-monitoring-job-scheduler-trigger" {
  provider         = google-beta
  name             = "reddit-monitoring-job-scheduler-trigger"
  description      = "http job to excute reddit-monitoring"
  schedule         = "0 6 * * *" # minute hour day-of-month month day-of-week
  time_zone        = "Asia/Tokyo"
  attempt_deadline = "320s"
  region           = var.region
  project          = var.project

  retry_config {
    retry_count = 1
  }

  http_target {
    http_method = "POST"
    uri         = "https://${google_cloud_run_v2_job.reddit-monitoring.location}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/${var.project}/jobs/${google_cloud_run_v2_job.reddit-monitoring.name}:run"

    oauth_token {
      service_account_email = var.sa_email
    }
  }
}
modules/cr/variables.tf
variable "project" {
  description = "A name of a GCP project"
  type        = string
  default     = YOUR_PROJECT # 変更してください
}

variable "region" {
  description = "A name of a GCP region"
  type        = string
  default     = "asia-northeast1"
}

variable "repository_path" {
  description = "A path of a GCP Artifact Registry repository"
  type        = string
  default     = YOUR_REPOSITORY_PATH # 変更してください
}

variable "sa_email" {
  description = "A service account email"
  type        = string
  default     = YOUR_SERVICE_ACCOUNT_EMAIL # 変更してください
}

それでは最後にルートディレクトリのファイル群を作成していきます。

ルートディレクトリのファイル群
main.tf
terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0.0"
    }
  }
}

provider "google" {
  credentials = file("./credentials/credentials.json")

  project = var.project
  region  = "ap-northeast-1"
  zone    = "ap-northeast-1a"
}

module "ar" {
  source = "./modules/ar"
}

module "cr" {
  source = "./modules/cr"
}
variables.tf
variable "project" {
  description = "A name of a GCP project"
  type        = string
  default     = YOUR_PROJECT
}

これでファイルの作成は完了しました。
最後にコマンドを実行して、実際にGCP上にリソースを作成していきましょう。

# コードを整形しておきます
$ terraform fmt
# エラーがないか静的解析します
$ terraform validate
# 実際に作成するリソースを確認します
$ terraform plan
# よければリソースを作成します
$ terraform apply

指定した時間にGoogleSheetに書き込まれていたら完了です!

おわりに

これで毎日放送されるアニメの人気度を直感的に監視することができました。
後は盛り上がってるアニメを観たり、Redditでコメントを残したりして、よりアニメを盛り上げていきましょう!
それでは、良きアニメライフを!!

Discussion