[Django]QuerySetを任意の順番でソートする方法

2021/12/18に公開

はじめに

これは Django Advent Calendar 2021 18日目の記事です(空いていたので飛び入りで)。

例えばレコードを特定のフィールドの値に応じてソートしたい、そのようなことを考えたことはないでしょうか。 Django でそれを実現する方法は少なくとも2種類あります。その方法を紹介します。

前提条件

次の環境で確認しています。

  • Django 3.2
  • Python 3.9

今回は Django 3.0 から導入された IntegerChoices を使ったコードのためそのままでは Django 2.2 では動きませんが、原理的には Django 2.2 でも動作可能です。

モデルを使った例

次のようなシンプルなモデルを考えます。

from django.db import models
from django.db.models import CharField, PositiveSmallIntegerField

class TaskStatus(models.IntegerChoices):
    TODO = 0, "TODO"
    DONE = 1, "完了"
    CANCELLED = 2, "キャンセル"
    WIP = 3, "WIP"

class Task(models.Model):
    name = CharField(max_length=100, verbose_name="タスク名")
    status = PositiveSmallIntegerField(verbose_name="ステータス", choices=TaskStatus.choices)

これに対して次のようにアクセスすると、「TODO > 完了 > キャンセル > WIP」の順番に並びます。

Task.objects.order_by("status")

これを例えば「TODO > WIP > 完了 > キャンセル」のように並べたいときはどうしたらいいでしょうか。

まず考えられるのは TaskStatus を IntegerChoices ではなく、 Model にする方法です。

from django.db import models
from django.db.models import CharField, PositiveSmallIntegerField, ForeignKey

class TaskStatus(models.Model):
    name = CharField(max_length=100, verbose_name="ステータス名")
    sort_order = PositiveSmallIntegerField(verbose_name="ソート順")

class Task(models.Model):
    name = CharField(max_length=100, verbose_name="カテゴリ名")
    status = ForeignKey(TaskStatus, on_delete=models.PROTECT, verbose_name="ステータス")

この場合、次のようにすればソートされた QuerySet が取得できます。

Task.objects.select_related("status").order_by("status__sort_order")

もちろんモデルを定義するため、他のフィールドを追加したり、他のメソッドを定義することができます。拡張性が必要な場合はこの方法がいいでしょう。

Case〜Whenを使った例

もう1つは、 Case, When を使って実現する方法です。 SQL で書くと次のようになります。

SELECT *,
       CASE
         WHEN status = 0 THEN 0
         WHEN status = 1 THEN 3
         WHEN status = 2 THEN 4
         WHEN status = 3 THEN 2
       END AS custom_order
FROM   task
ORDER  BY custom_order;

これを Django で書くと次のようになります。

from django.db import models
from django.db.models import CharField, PositiveSmallIntegerField, Case, When

class TaskStatus(models.IntegerChoices):
    TODO = 0, "TODO"
    DONE = 1, "完了"
    CANCELLED = 2, "キャンセル"
    WIP = 3, "WIP"

class TaskQuerySet(models.QuerySet):
    def __init__(self, model=None, query=None, using=None, hints=None):
        super().__init__(model, query, using, hints)

    def order_by_status(self):
        return self.annotate(custom_order=Case(
            When(status=TaskStatus.TODO, then=1),
            When(status=TaskStatus.DONE, then=3),
            When(status=TaskStatus.CANCELLED, then=4),
            When(status=TaskStatus.WIP, then=2),
        )).order_by("custom_order")

class TaskManager(models.Manager):
    def get_queryset(self):
        return TaskQuerySet(self.model, using=self._db)

    def order_by_status(self):
        return self.get_queryset().order_by_status()

class Task(models.Model):
    name = CharField(max_length=100, verbose_name="タスク名")
    status = PositiveSmallIntegerField(verbose_name="ステータス", choices=TaskStatus.choices)

    objects = TaskManager()

    def __str__(self):
        return f"{self.name} {self.status}"

Manager 、 QuerySet を独自で定義したため長くなっていますが、ポイントは次のメソッドです。 TaskStatus の選択肢1つずつに値を割り当て、それを custom_order という名前にして、その値でソートしています。

class TaskQuerySet(models.QuerySet):
    def order_by_status(self):
        return self.annotate(custom_order=Case(
            When(status=TaskStatus.TODO, then=1),
            When(status=TaskStatus.DONE, then=3),
            When(status=TaskStatus.CANCELLED, then=4),
            When(status=TaskStatus.WIP, then=2),
        )).order_by("custom_order")

このとき、次のようにすれば、「TODO > WIP > 完了 > キャンセル」の順にソートされた QuerySet が取得できます。

Task.objects.order_by_status()

Case〜Whenを書きやすくする

しかし、全ての選択肢に対して When を書くのは面倒です。もっと楽に書きたい。そのためには次のようにします。まずコードを全て掲載します。この qs_custom_order がポイントです。

from django.db import models
from django.db.models import CharField, PositiveSmallIntegerField, Case, When, Value, IntegerField

def qs_custom_order(qs, key: str, sort_order: tuple):
    cases = []
    for index, value in enumerate(sort_order):
        when_params = {
            key: value,
            "then": Value(index),
        }
        cases.append(When(**when_params))

    return qs.annotate(custom_order=Case(*cases, output_field=IntegerField())).order_by("custom_order")

class TaskStatus(models.IntegerChoices):
    TODO = 0, "TODO"
    DONE = 1, "完了"
    CANCELLED = 2, "キャンセル"
    WIP = 3, "WIP"

    @staticmethod
    def sort_order():
        return (
            TaskStatus.TODO,
            TaskStatus.WIP,
            TaskStatus.DONE,
            TaskStatus.CANCELLED,
        )

class TaskQuerySet(models.QuerySet):
    def __init__(self, model=None, query=None, using=None, hints=None):
        super().__init__(model, query, using, hints)

    def order_by_status(self):
        return qs_custom_order(self, key="status", sort_order=TaskStatus.sort_order())

class TaskManager(models.Manager):
    def get_queryset(self):
        return TaskQuerySet(self.model, using=self._db)

    def order_by_status(self):
        return self.get_queryset().order_by_status()

class Task(models.Model):
    name = CharField(max_length=100, verbose_name="タスク名")
    status = PositiveSmallIntegerField(verbose_name="ステータス", choices=TaskStatus.choices)

    objects = TaskManager()

    def __str__(self):
        return f"{self.name} {self.status}"

まず、qs_custom_order の使用箇所のみ抜き出すと次のようになります。 status フィールドをキーとして、ソート順を TaskStatus.sort_order() として定義します。

class TaskQuerySet(models.QuerySet):
    def order_by_status(self):
        return qs_custom_order(self, key="status", sort_order=TaskStatus.sort_order())

次に、 qs_custom_order の定義です。渡された sort_order をループして、 When オブジェクトを生成していき、値を custom_order という名前で追加し、ソートしています。

def qs_custom_order(qs, key: str, sort_order: tuple):
    cases = []
    for index, value in enumerate(sort_order):
        when_params = {
            key: value,
            "then": Value(index),
        }
        cases.append(When(**when_params))

    return qs.annotate(custom_order=Case(*cases, output_field=IntegerField())).order_by("custom_order")

TaskStatus のソート順の定義は次のようになります。 TaskStatus の中に含まれているため読みやすく、ステータスが増えたときに変更し忘れるリスクも少ないです。

class TaskStatus(models.IntegerChoices):
    TODO = 0, "TODO"
    DONE = 1, "完了"
    CANCELLED = 2, "キャンセル"
    WIP = 3, "WIP"

    @staticmethod
    def sort_order():
        return (
            TaskStatus.TODO,
            TaskStatus.WIP,
            TaskStatus.DONE,
            TaskStatus.CANCELLED,
        )

おわりに

今回のコードはGitHubに公開しています。

https://github.com/ikemo3/django-example/pull/16

Case〜Whenを使うことで、任意の順に並び替えをすることができました。 qs_custom_order の実装を変えることで、同じ順位を認めたり、単に annotate でフィールドを追加するといった応用も可能です。ぜひ活用してみてください。

Discussion