🔧
maintenance_tasks gemで実現する実践的なデータ移行・バッチ処理
maintenance_tasks gemで実現する実践的なデータ移行・バッチ処理
はじめに
大規模なRailsアプリケーションでは、データベースの移行やバッチ処理が避けて通れません。通常のActive Jobでは中断・再開が困難で、長時間の処理中にデプロイが発生すると問題になることがあります。
Shopifyが開発したmaintenance_tasksは、中断可能で再開可能なメンテナンスタスクを簡単に作成・管理できるRails engineです。Web UIでタスクの進行状況を確認でき、本番環境での安全な運用を実現します。
セットアップ
インストール
bundle add maintenance_tasks
bin/rails generate maintenance_tasks:install
bin/rails db:migrate
インストール後、/maintenance_tasks
でWeb UIにアクセスできます。
設定
本番環境では永続化されたキューバックエンドの使用を強く推奨します:
# config/application.rb
config.active_job.queue_adapter = :sidekiq
実践例1: Mongoid to PostgreSQL移行
MongoDB(Mongoid)からPostgreSQLへの移行は、スキーマの違いやデータ変換が必要な典型的なケースです。
# app/models/maintenance/migrate_users_to_postgresql_task.rb
module Maintenance
class MigrateUsersToPostgresqlTask < MaintenanceTasks::Task
# 移行対象の古いMongoユーザーを取得
def collection
# MongoのUserモデルを全件取得(バッチ処理される)
MongoUser.all.limit(10000) # 段階的に実行
end
def process(mongo_user)
# PostgreSQL側に既存チェック
return if User.exists?(legacy_id: mongo_user.id.to_s)
# データ変換とPostgreSQLへの保存
User.create!(
legacy_id: mongo_user.id.to_s,
email: mongo_user.email,
name: mongo_user.full_name, # Mongoの複数フィールドを結合
created_at: mongo_user.created_at&.to_time,
settings: transform_settings(mongo_user.preferences), # JSON変換
status: map_status(mongo_user.state) # ステータス変換
)
# 関連データも移行
migrate_user_posts(mongo_user)
rescue StandardError => e
# エラーログを残して継続
Rails.logger.error "Failed to migrate user #{mongo_user.id}: #{e.message}"
raise e # 再試行のため例外を再発生
end
private
def transform_settings(preferences)
return {} if preferences.blank?
{
theme: preferences['ui_theme'] || 'light',
notifications: preferences['email_notifications'] != false,
timezone: preferences['tz'] || 'UTC'
}
end
def map_status(mongo_state)
case mongo_state
when 'active' then 'confirmed'
when 'pending' then 'unconfirmed'
else 'inactive'
end
end
def migrate_user_posts(mongo_user)
mongo_user.posts.each do |post|
next if Post.exists?(legacy_id: post.id.to_s)
Post.create!(
legacy_id: post.id.to_s,
user: User.find_by(legacy_id: mongo_user.id.to_s),
title: post.title,
content: post.body,
published_at: post.published ? post.created_at : nil
)
end
end
end
end
実践例2: 大量データのバッチ処理
ユーザーデータの一括更新など、パフォーマンスを考慮した処理の例です。
# app/models/maintenance/update_user_statistics_task.rb
module Maintenance
class UpdateUserStatisticsTask < MaintenanceTasks::Task
# スロットリング機能でDB負荷を制御
throttle_on(backoff: 30.seconds) do
DatabaseHealthChecker.high_load?
end
def collection
# バッチサイズを明示的に指定(Rails 7.1+)
User.active.in_batches(of: 500)
end
def process(user_batch)
# バッチ単位で効率的に処理
user_ids = user_batch.pluck(:id)
# 統計データを一括計算
statistics = calculate_user_statistics(user_ids)
# 一括更新でパフォーマンス向上
User.where(id: user_ids).update_all(
posts_count: statistics[:posts_count],
last_activity_at: statistics[:last_activity],
engagement_score: statistics[:engagement_score],
updated_at: Time.current
)
end
private
def calculate_user_statistics(user_ids)
posts_count = Post.where(user_id: user_ids)
.group(:user_id)
.count
last_activities = Activity.where(user_id: user_ids)
.group(:user_id)
.maximum(:created_at)
engagement_scores = calculate_engagement_scores(user_ids)
{
posts_count: posts_count,
last_activity: last_activities,
engagement_score: engagement_scores
}
end
def calculate_engagement_scores(user_ids)
# 複雑な計算ロジック
User.where(id: user_ids).includes(:posts, :comments, :likes)
.map { |user| [user.id, compute_score(user)] }
.to_h
end
def compute_score(user)
posts_score = user.posts.count * 10
comments_score = user.comments.count * 5
likes_score = user.likes.count * 2
(posts_score + comments_score + likes_score) / 17.0
end
end
end
パラメータ付きタスク
動的な条件でタスクを実行したい場合:
module Maintenance
class ProcessOrdersByDateTask < MaintenanceTasks::Task
attribute :start_date, :date
attribute :end_date, :date
attribute :order_status, :string, default: 'pending'
validates :start_date, :end_date, presence: true
validate :end_date_after_start_date
def collection
Order.where(
created_at: start_date.beginning_of_day..end_date.end_of_day,
status: order_status
)
end
def process(order)
OrderProcessor.new(order).execute!
end
private
def end_date_after_start_date
return unless start_date && end_date
errors.add(:end_date, 'must be after start date') if end_date < start_date
end
end
end
運用のポイント
エラーハンドリング
def process(record)
# 個別レコードのエラーは記録して継続
SomeService.process(record)
rescue SomeRetryableError => e
Rails.logger.warn "Retryable error for #{record.id}: #{e.message}"
raise e # 再試行される
rescue SomeSkippableError => e
Rails.logger.error "Skipped #{record.id}: #{e.message}"
# 例外を再発生させないことでスキップ
end
モニタリング
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.error_handler = ->(error, task_context, _errored_element) do
Bugsnag.notify(error) do |report|
report.add_tab(:maintenance_task, {
task_name: task_context[:task_name],
run_id: task_context[:run_id]
})
end
end
本番環境での実行
- 段階的実行: 最初は小さなバッチで動作確認
- オフピーク時間: システム負荷の低い時間帯を選択
- 監視: APMツールでパフォーマンス監視
- ロールバック準備: データの元に戻す手順も用意
まとめ
maintenance_tasksは中断・再開可能で、Web UIから進行状況を確認できる強力なツールです。特に大規模なデータ移行や長時間のバッチ処理において、本番環境での安全性と可視性を大幅に向上させます。
MongoDB to PostgreSQL移行のような複雑なケースでも、適切なエラーハンドリングとスロットリング機能により、安定した運用が可能です。次回の大きなデータ移行の際は、ぜひ検討してみてください。
Discussion