【Go】TMDbから取得した映画データを効率的にDBへ保存するモジュールをgoroutineで実装してみた
概要
個人開発で映画のレビューアプリを作成する際、TMDb(The Movie Database)からデータを取得しようと思ったのですが、アプリ内で都度TMDb APIを実行するのはレート制限や実行速度に懸念があり、極力避けたいと考えていました。
よってアプリ側のDBで映画データを持つべく、TMDb APIの実行結果をDBに格納するモジュール(コマンド?)を作成しました。
ここ一年ほど実務で使用していることもあり、言語はGoを使用しています。そこそこのデータ量を扱うというところで、業務であまり使ってこなかったゴルーチン(goroutine)を使った並行処理についてもこの機にキャッチアップしてしまおうという魂胆です。
リポジトリ
映画レビューアプリのリポジトリに格納しています。(環境構築程度しかできていませんが…。絶賛実装中です。)
事前準備と開発環境
TMDb API: TMDbの公式サイトでAPIキーを取得します。
Go: ver1.22.5
DB: MySQL8.0
ORM: GORM
モジュールの概要
実行ファイルmain.go内に処理を記述します。なお、DB登録処理については別途リポジトリを用意していた(映画レビューアプリ本体で使用する想定のDAOとrepository)ため、そちらを利用します。
上記ファイルを実行するスクリプトを作成し、shコマンドで実行する想定です。
echo "Starting TMDB data fetch at $(date)"
go run main.go
echo "Finished TMDB data fetch at $(date)"
モジュール内では、年単位で映画の作品データを取得し、DBにデータを格納します。取得する期間は1950~2025年
になります。
対象年度ごとの検索処理に対してゴルーチンを発行し、データ取得処理を並行処理化します。
また、TMDb APIの実行についてはgolang-tmdbパッケージを使用します。
TMDb APIの仕様について
なお、TMDb APIのレート制限は下記です。
- 同一IPアドレスからの同時接続数が最大20を超えないこと
- 1秒間に送信するリクエスト数が50を超えないこと
よって上記を超えないよう、ゴルーチンのプロセス数を10件、プロセス全体の秒間リクエスト数を40件に制限していきます。
またAPIの仕様上、検索条件に対して20件500ページ(10000万件)までしか取得できないため、各年度ごとに人気(ポピュラリティ)上位10000万件を取得する形にします。
(それ以上はかなりニッチな作品の情報になる気がするので…。ひとまずはこれでよしとします。)
今回使用するエンドポイントdiscover/movieで取得できない項目もある(上映時間やキャスト情報など)ので、それらは別途バッチ処理を実装して個別に取得するようにしたいと思っています。
処理の流れ
大まかな流れは下記です。
- コンテキストやDB、TMDbクライアントの初期化処理
- レート制限の設定
- ゴルーチンのワーカーを起動(データ取得処理用と実行結果格納用)
- 実行する年度を配布し、各ゴルーチンで対象年度の作品情報を取得してDBに格納
- 全てのゴルーチンが完了したら処理終了
作品取得処理(ゴルーチン)の流れは下記。
- 指定された年度から検索オプションを作成
- レート制限のチェック
- TMDb APIのDiscoverMovie関数を実行し、ページごとに作品情報を取得
- 検索結果をDBにinsert
- 結果を返して終了
実装
main.go(全体像)
const (
maxPages = 500 // TMDbの検索結果に対して取得できる最大ページ数
startYear = 1950 // 検索開始年度
endYear = 2025 // 検索終了年度
maxWorkers = 10 // 同時に処理する年数
)
func main() {
// 初期化処理
ctx := context.Background()
db := db.Init()
repo := repositoryImpl.NewMovieRepositoryImpl(db)
config := tmdb.Config{
APIKey: os.Getenv("TMDB_API_KEY"),
Proxies: nil,
UseProxy: false,
}
tmdbClient := tmdb.Init(config)
if tmdbClient == nil {
panic("failed tmdb client")
}
// API全体で共有する単一のレートリミッター
// TMDbのレート制限が秒間50リクエストのため、少し余裕を持って40に設定
rateLimiter := rate.NewLimiter(rate.Every(time.Second/40), 40)
var wg sync.WaitGroup
yearCh := make(chan int)
resultCh := make(chan string)
// ワーカーの起動
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for year := range yearCh {
if err := processYear(ctx, year, tmdbClient, repo, rateLimiter); err != nil {
resultCh <- fmt.Sprintf("Error processing year %d: %v", year, err)
continue
}
resultCh <- fmt.Sprintf("Successfully processed year %d", year)
}
}()
}
// 結果を受け取るゴルーチン
go func() {
for result := range resultCh {
log.Println(result)
}
}()
// 年の配布
for year := startYear; year <= endYear; year++ {
yearCh <- year
}
close(yearCh)
// 全ワーカーの完了を待つ
wg.Wait()
close(resultCh)
fmt.Println("All processing completed")
}
部分ごとに見ていきます。
初期化処理
// 初期化処理
ctx := context.Background()
db := db.Init()
repo := repositoryImpl.NewMovieRepositoryImpl(db)
config := tmdb.Config{
APIKey: os.Getenv("TMDB_API_KEY"),
Proxies: nil,
UseProxy: false,
}
tmdbClient := tmdb.Init(config)
if tmdbClient == nil {
panic("failed tmdb client")
}
コンテキストやGORM、TMDbパッケージのクライアントの初期化を行います。
repositoryImplはGORMを使用してmoviesテーブルにアクセスするリポジトリの実装になります。この辺りの構成などについては別途記事にて紹介する予定です。
レートリミッター設定
// API全体で共有する単一のレートリミッター
// TMDbのレート制限が秒間50リクエストのため、少し余裕を持って40に設定
rateLimiter := rate.NewLimiter(rate.Every(time.Second/40), 40)
レートリミッターを設定します。
今回は全てのプロセスで共通のレートリミッターとして定義し、秒間40リクエストの制限を設定しています。バーストも一応合わせて40で設定しました。
参考記事:
ゴルーチンの設定と起動
var wg sync.WaitGroup
yearCh := make(chan int)
resultCh := make(chan string)
// ワーカーの起動
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for year := range yearCh {
if err := processYear(ctx, year, tmdbClient, repo, rateLimiter); err != nil {
resultCh <- fmt.Sprintf("Error processing year %d: %v", year, err)
continue
}
resultCh <- fmt.Sprintf("Successfully processed year %d", year)
}
}()
}
// 結果を受け取るゴルーチン
go func() {
for result := range resultCh {
log.Println(result)
}
}()
// 年の配布
for year := startYear; year <= endYear; year++ {
yearCh <- year
}
close(yearCh)
// 全ワーカーの完了を待つ
wg.Wait()
close(resultCh)
並行処理は主に3つの部分で構成されています。
- ワーカープール(処理の実行者)
- タスクの配布(年の配布)
- 結果の収集
[メインゴルーチン]
│
├──> [yearCh] ──┬──> [Worker 1] ──┐
│ ├──> [Worker 2] ──┼──> [resultCh] ──> [結果収集ゴルーチン]
│ └──> [Worker 3] ──┘
│
└──> wg.Wait() (全ワーカー完了待ち)
ゴルーチンの流れは上記になります。
実装としては、先に取得処理と結果収集のワーカーを起動させておき、年(タスク)の配布を行なってワーカー側で受け取っていくような流れです。
年の配布処理を先に行なってしまうと、受け取り手(ワーカー)がいないためデッドロックが起こります。
参考記事:
作品データ取得処理の全体像
// 対象年度の作品情報を取得し、DBにUpsertする関数
func processYear(ctx context.Context, year int, tmdbClient *tmdb.TMDb, repo *repositoryImpl.MovieRepositoryImpl, rateLimiter *rate.Limiter) error {
startDate := fmt.Sprintf("%d-01-01", year)
endDate := fmt.Sprintf("%d-12-31", year)
// TMDbの検索条件を設定
options := map[string]string{
"primary_release_date.gte": startDate,
"primary_release_date.lte": endDate,
"language": "ja-JP", // 作品情報を日本語で取得
"region": "JP", // 日本で公開された作品を取得
"sort_by": "popularity.desc", // 人気順でソートして取得
"page": "1",
}
var allMovies []*domain.Movie
currentPage := 1
totalPages := maxPages
for currentPage <= totalPages {
// レート制限のチェックを行い、40リクエストを超える場合リクエスト可能になるまで待機
if err := rateLimiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit error: %v", err)
}
// ページを設定してTMDb APIを打鍵
options["page"] = fmt.Sprintf("%d", currentPage)
pageResult, err := tmdbClient.DiscoverMovie(options)
if err != nil {
return fmt.Errorf("failed to fetch page %d: %v", currentPage, err)
}
// 取得できる最大ページ数の制限があるため、総ページ数が500を超えていたらそれ以上取得しないようにする
if currentPage == 1 {
totalPages = pageResult.TotalPages
if totalPages > maxPages {
totalPages = maxPages
}
}
// 取得した作品情報をモデルに格納してスライスに追加
for _, m := range pageResult.Results {
releaseDate, err := time.Parse("2006-01-02", m.ReleaseDate)
if err != nil {
continue
}
movie := &domain.Movie{
MovieID: m.ID,
Title: m.Title,
Overview: m.Overview,
ReleaseDate: *types.NewDate(releaseDate),
TMDBImageURL: m.PosterPath,
Popularity: m.Popularity,
VoteAverage: m.VoteAverage,
VoteCount: m.VoteCount,
}
allMovies = append(allMovies, movie)
}
currentPage++
}
if len(allMovies) > 0 {
// 1000件ごとにチャンクしてInsert
const batchSize = 1000
for i := 0; i < len(allMovies); i += batchSize {
end := i + batchSize
if end > len(allMovies) {
end = len(allMovies)
}
if err := repo.BulkInsertMovies(allMovies[i:end]); err != nil {
return fmt.Errorf("failed to insert movies batch: %v", err)
}
}
}
return nil
}
こちらも部分ごとに見ていきます。
検索オプションの作成
startDate := fmt.Sprintf("%d-01-01", year)
endDate := fmt.Sprintf("%d-12-31", year)
// TMDbの検索条件を設定
options := map[string]string{
"primary_release_date.gte": startDate,
"primary_release_date.lte": endDate,
"language": "ja-JP", // 作品情報を日本語で取得
"region": "JP", // 日本で公開された作品を取得
"sort_by": "popularity.desc", // 人気順でソートして取得
"page": "1",
}
検索オプションを作成します。
(オプション名を手書きしないといけないのはオプションを調べたり若干手間だなあと思いつつ…。)
多言語化対応なども特に考えていないため、今回は日本語で情報を取得し、日本で公開された作品のみ取得するようにします。
レート制限のチェック
// レート制限のチェックを行い、40リクエストを超える場合リクエスト可能になるまで待機
for currentPage <= totalPages {
if err := rateLimiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit error: %v", err)
}
rateLimiterに設定したレート制限を超えていないかチェックを行います。
超えていた場合、リクエスト可能になるまで待機します。
作品情報取得処理(TMDb APIを打鍵)
// ページを設定してTMDb APIを打鍵
options["page"] = fmt.Sprintf("%d", currentPage)
pageResult, err := tmdbClient.DiscoverMovie(options)
if err != nil {
return fmt.Errorf("failed to fetch page %d: %v", currentPage, err)
}
// 取得できる最大ページ数の制限があるため、総ページ数が500を超えていたらそれ以上取得しないようにする
if currentPage == 1 {
totalPages = pageResult.TotalPages
if totalPages > maxPages {
totalPages = maxPages
}
}
ページごとにTMDb APIを打鍵し、作品情報を取得します。
最初のループの際、取得した総ページ数をセットします。
作品情報をモデルに格納
// 取得した作品情報をモデルに格納してスライスに追加
for _, m := range pageResult.Results {
releaseDate, err := time.Parse("2006-01-02", m.ReleaseDate)
if err != nil {
continue
}
movie := &domain.Movie{
MovieID: m.ID,
Title: m.Title,
Overview: m.Overview,
ReleaseDate: *types.NewDate(releaseDate),
TMDBImageURL: m.PosterPath,
Popularity: m.Popularity,
VoteAverage: m.VoteAverage,
VoteCount: m.VoteCount,
}
allMovies = append(allMovies, movie)
}
currentPage++
movies
テーブルに格納するため、取得したデータをドメインモデルに変換します。
後々repository内でドメインモデルをDAOモデルに変換し、Insertします。
なお、movies
テーブルの各カラムの型情報はgolang-tmdb
で定義されている映画情報の構造体に合わせて実装しています。
データ登録処理
if len(allMovies) > 0 {
// 1000件ごとにチャンクしてInsert
const batchSize = 1000
for i := 0; i < len(allMovies); i += batchSize {
end := i + batchSize
if end > len(allMovies) {
end = len(allMovies)
}
if err := repo.BulkInsertMovies(allMovies[i:end]); err != nil {
return fmt.Errorf("failed to insert movies batch: %v", err)
}
}
}
return nil
BulkInsertMovies
では、ON DUPLICATE UPDATEクエリを使用することで、主キーに重複があった場合には更新するようにしています。
普通にInsertしていたところ、特定の年度で主キーの重複エラーとなったためです。
実行結果
Starting TMDB data fetch at 2024年 11月17日 日曜日 23時02分26秒 JST
2024/11/17 23:07:17 Successfully processed year 1950
2024/11/17 23:07:27 Successfully processed year 1952
2024/11/17 23:07:28 Successfully processed year 1951
2024/11/17 23:07:34 Successfully processed year 1953
2024/11/17 23:07:42 Successfully processed year 1954
2024/11/17 23:07:54 Successfully processed year 1955
2024/11/17 23:08:13 Successfully processed year 1956
2024/11/17 23:08:30 Successfully processed year 1957
2024/11/17 23:08:50 Successfully processed year 1958
2024/11/17 23:08:50 Successfully processed year 1959
2024/11/17 23:14:01 Successfully processed year 1960
2024/11/17 23:14:27 Successfully processed year 1961
実行して少し経つと、上記のようにログが出力されました。いい感じに並行処理が実行できていそうですね。この時点でDBにデータが格納されていることも確認できました。
All processing completed
しばらく放置していると、無事処理が完了しました。
実行時間については、10年分がおよそ20分強、76年分でおよそ2.5時間といったところでした。
(元々は直列で処理を書いていたのですが、一体何時間かかったんだろうか…。)
今回の設定ではレート制限に引っかかることもありませんでした。プロセス数をもう少し増やしてみてもいいかもしれません。
取得した総件数は478236
でした。
まとめ
モジュールの実装だけでも思ったより情報量が多く、要点だけ抑えるような形になってしまいました。
実務でも外部APIのドキュメントを調べて設計・実装に落とし込んでいく作業は度々発生しますが、今回はかなりいい勉強になりました。
一番危惧していたのがレート制限でしたが、秒間50リクエストとかなり緩い制限で救われました。
ゴルーチンについてもわかっているつもりで理解できていない部分が多そうなので、引き続きキャッチアップしていきたいです。
ひとまず本モジュールが完成したことで、アプリのベースになるデータを取得することができたので、引き続きアプリの制作を進めたいと思います。
(近頃業務がまあまあ忙しく…なかなか時間は取れませんが…😢)
Discussion