🔧

maintenance_tasks gemで実現する実践的なデータ移行・バッチ処理

に公開

maintenance_tasks gemで実現する実践的なデータ移行・バッチ処理

はじめに

大規模なRailsアプリケーションでは、データベースの移行やバッチ処理が避けて通れません。通常のActive Jobでは中断・再開が困難で、長時間の処理中にデプロイが発生すると問題になることがあります。

Shopifyが開発したmaintenance_tasksは、中断可能で再開可能なメンテナンスタスクを簡単に作成・管理できるRails engineです。Web UIでタスクの進行状況を確認でき、本番環境での安全な運用を実現します。

セットアップ

インストール

bundle add maintenance_tasks
bin/rails generate maintenance_tasks:install
bin/rails db:migrate

インストール後、/maintenance_tasksでWeb UIにアクセスできます。

設定

本番環境では永続化されたキューバックエンドの使用を強く推奨します:

# config/application.rb
config.active_job.queue_adapter = :sidekiq

実践例1: Mongoid to PostgreSQL移行

MongoDB(Mongoid)からPostgreSQLへの移行は、スキーマの違いやデータ変換が必要な典型的なケースです。

# app/models/maintenance/migrate_users_to_postgresql_task.rb
module Maintenance
  class MigrateUsersToPostgresqlTask < MaintenanceTasks::Task
    # 移行対象の古いMongoユーザーを取得
    def collection
      # MongoのUserモデルを全件取得(バッチ処理される)
      MongoUser.all.limit(10000) # 段階的に実行
    end

    def process(mongo_user)
      # PostgreSQL側に既存チェック
      return if User.exists?(legacy_id: mongo_user.id.to_s)

      # データ変換とPostgreSQLへの保存
      User.create!(
        legacy_id: mongo_user.id.to_s,
        email: mongo_user.email,
        name: mongo_user.full_name, # Mongoの複数フィールドを結合
        created_at: mongo_user.created_at&.to_time,
        settings: transform_settings(mongo_user.preferences), # JSON変換
        status: map_status(mongo_user.state) # ステータス変換
      )

      # 関連データも移行
      migrate_user_posts(mongo_user)
    rescue StandardError => e
      # エラーログを残して継続
      Rails.logger.error "Failed to migrate user #{mongo_user.id}: #{e.message}"
      raise e # 再試行のため例外を再発生
    end

    private

    def transform_settings(preferences)
      return {} if preferences.blank?
      
      {
        theme: preferences['ui_theme'] || 'light',
        notifications: preferences['email_notifications'] != false,
        timezone: preferences['tz'] || 'UTC'
      }
    end

    def map_status(mongo_state)
      case mongo_state
      when 'active' then 'confirmed'
      when 'pending' then 'unconfirmed'
      else 'inactive'
      end
    end

    def migrate_user_posts(mongo_user)
      mongo_user.posts.each do |post|
        next if Post.exists?(legacy_id: post.id.to_s)
        
        Post.create!(
          legacy_id: post.id.to_s,
          user: User.find_by(legacy_id: mongo_user.id.to_s),
          title: post.title,
          content: post.body,
          published_at: post.published ? post.created_at : nil
        )
      end
    end
  end
end

実践例2: 大量データのバッチ処理

ユーザーデータの一括更新など、パフォーマンスを考慮した処理の例です。

# app/models/maintenance/update_user_statistics_task.rb
module Maintenance
  class UpdateUserStatisticsTask < MaintenanceTasks::Task
    # スロットリング機能でDB負荷を制御
    throttle_on(backoff: 30.seconds) do
      DatabaseHealthChecker.high_load?
    end

    def collection
      # バッチサイズを明示的に指定(Rails 7.1+)
      User.active.in_batches(of: 500)
    end

    def process(user_batch)
      # バッチ単位で効率的に処理
      user_ids = user_batch.pluck(:id)
      
      # 統計データを一括計算
      statistics = calculate_user_statistics(user_ids)
      
      # 一括更新でパフォーマンス向上
      User.where(id: user_ids).update_all(
        posts_count: statistics[:posts_count],
        last_activity_at: statistics[:last_activity],
        engagement_score: statistics[:engagement_score],
        updated_at: Time.current
      )
    end

    private

    def calculate_user_statistics(user_ids)
      posts_count = Post.where(user_id: user_ids)
                       .group(:user_id)
                       .count

      last_activities = Activity.where(user_id: user_ids)
                               .group(:user_id)
                               .maximum(:created_at)

      engagement_scores = calculate_engagement_scores(user_ids)

      {
        posts_count: posts_count,
        last_activity: last_activities,
        engagement_score: engagement_scores
      }
    end

    def calculate_engagement_scores(user_ids)
      # 複雑な計算ロジック
      User.where(id: user_ids).includes(:posts, :comments, :likes)
          .map { |user| [user.id, compute_score(user)] }
          .to_h
    end

    def compute_score(user)
      posts_score = user.posts.count * 10
      comments_score = user.comments.count * 5
      likes_score = user.likes.count * 2
      
      (posts_score + comments_score + likes_score) / 17.0
    end
  end
end

パラメータ付きタスク

動的な条件でタスクを実行したい場合:

module Maintenance
  class ProcessOrdersByDateTask < MaintenanceTasks::Task
    attribute :start_date, :date
    attribute :end_date, :date
    attribute :order_status, :string, default: 'pending'

    validates :start_date, :end_date, presence: true
    validate :end_date_after_start_date

    def collection
      Order.where(
        created_at: start_date.beginning_of_day..end_date.end_of_day,
        status: order_status
      )
    end

    def process(order)
      OrderProcessor.new(order).execute!
    end

    private

    def end_date_after_start_date
      return unless start_date && end_date
      
      errors.add(:end_date, 'must be after start date') if end_date < start_date
    end
  end
end

運用のポイント

エラーハンドリング

def process(record)
  # 個別レコードのエラーは記録して継続
  SomeService.process(record)
rescue SomeRetryableError => e
  Rails.logger.warn "Retryable error for #{record.id}: #{e.message}"
  raise e # 再試行される
rescue SomeSkippableError => e
  Rails.logger.error "Skipped #{record.id}: #{e.message}"
  # 例外を再発生させないことでスキップ
end

モニタリング

# config/initializers/maintenance_tasks.rb
MaintenanceTasks.error_handler = ->(error, task_context, _errored_element) do
  Bugsnag.notify(error) do |report|
    report.add_tab(:maintenance_task, {
      task_name: task_context[:task_name],
      run_id: task_context[:run_id]
    })
  end
end

本番環境での実行

  1. 段階的実行: 最初は小さなバッチで動作確認
  2. オフピーク時間: システム負荷の低い時間帯を選択
  3. 監視: APMツールでパフォーマンス監視
  4. ロールバック準備: データの元に戻す手順も用意

まとめ

maintenance_tasksは中断・再開可能で、Web UIから進行状況を確認できる強力なツールです。特に大規模なデータ移行や長時間のバッチ処理において、本番環境での安全性と可視性を大幅に向上させます。

MongoDB to PostgreSQL移行のような複雑なケースでも、適切なエラーハンドリングとスロットリング機能により、安定した運用が可能です。次回の大きなデータ移行の際は、ぜひ検討してみてください。

Discussion