👩‍💻

#135 【Flet】タスクベースのProgressBarを作ってみた

に公開

はじめに

最近、Fletを使用してタスクベースのProgressBarを作ってみましたので自身の備忘録と共有も兼ねて記載します。

Fletとは?

FletとはPythonのライブラリで、特段フロントエンドの知識がなくても、Web、デスクトップ、モバイルアプリを簡単に構築できるフレームワークです。

参考:https://flet.dev/docs/#what-is-flet

目標

改めて本記事の目的を整理しますと、既にFletが用意してくれている ProgressBar のドキュメントですが、以上のコンポーネントを使用しながら

タスク:○○を実行中... [■■■------] 1/3

というような形でタスクの進行状況とタスク数を主にしたタスクベースでのProgressBarを作ってみるのが目標です。

要件

  1. ProgressBarはTaskの進捗を受け取って画面に反映する
  2. Taskの進捗を他のコンポーネントでも受け取れる

以上の要件を元に実装を進めます。

実装の流れ

以下の流れで実装を進めます。

  1. Observer クラスを作る
  2. Subject クラスを作る
  3. Task クラスを作る
  4. TaskCollection クラスを作る
  5. TaskBarクラス(コンポーネント)を作る

各クラスを実装しながらクラスの役割などを説明していきます。

Observer & Subject

今回、複数コンポーネントにTaskの状態を通知するということでObserverパターンを使用して実装します。

まずは以下のようにObserverのInterfaceとSubjectの抽象クラスを作成します。

observer.py
from abc import ABC, abstractmethod


class Observer(ABC):
    @abstractmethod
    def update(self):
        pass

subject.py
from abc import ABC, abstractmethod
from typing import List
from features.commons._abstracts.observer import Observer


class Subject(ABC):
    def __init__(self):
        self._observers: List[Observer] = []

    def attach(self, observer: Observer):
        self._observers.append(observer)

    def detach(self, observer: Observer):
        self._observers.append(observer)

    @abstractmethod
    def notify(self):
        pass

関係としては、Observerが監視者としてSubjectの状態を監視します。
Subjectが特定のタイミングで notify() メソッドを呼び出し、Subjectにattachされた各Observerの update() メソッドを呼び出します。
実際に具象クラスに起こす際には、Observerの update() にSubjectから通知があった際の処理を記述します。
今回はTaskBarがObserverの具象クラスとして、各Task(Subject) の通知を受け取り画面に反映することになります。
また、今回Subjectの notify() メソッドは実装先で update() に渡す値が異なることを考え抽象化しています。

TaskSubject

前項で作成したSubjectを継承してTaskSubjectという抽象クラスを作成します。
本クラスがTaskの進捗に変更があった際にattachされたObserverに対して通知を送るというような機能を持つクラスになります。
まずはTaskのステータスをEnumで定義します。

task_status.py
from enum import Enum


class TaskStatus(Enum):
    INIT = "INIT"
    PROGRESSING = "PROGRESSING"
    SUCCESS = "SUCCESS"
    FAILED = "FAILED"

そうしたらTaskSubjectという抽象クラスを作成します。

task_subject.py
from abc import ABC, abstractmethod
from features.commons._abstracts.subject import Subject
from features.tasks.task_status import TaskStatus


class TaskSubject(Subject, ABC):
    def __init__(self, name: str):
        super().__init__()
        self._name = name
        self._status = TaskStatus.INIT

    def set_status(self, status: TaskStatus):
        self._status = status
        self.notify()

    def notify(self):
        for observer in self._observers:
            observer.update(self._name, self._status)

    @abstractmethod
    def execute(self) -> bool:
        pass

以上が前項で作成したSubject継承したTaskSubjectという抽象クラスです。
TaskSubjectは新たに「name と status」を持ち、 set_status() でタスクのステータスが変わると notify() メソッドで各observerに対し通知を送ります。
このクラスを継承し、各実行タスクの具象クラスを作成していくことになりますが、 execute() という抽象メソッドの内容は継承先で実装します。
例として、以下4つのタスクを作成してみます。

init_task.py
from features.tasks._abstracts.task_subject import TaskSubject
from features.tasks.task_status import TaskStatus
from time import sleep


class InitTask(TaskSubject):
    def __init__(self, name):
        super().__init__(name)

    def execute(self) -> bool:
        # 初期化処理タスク
        # ステータスを作業中にする
        self.set_status(TaskStatus.PROGRESSING)

        sleep(2)  # ←なんらかの処理

        # 今回はテストなので確定で成功のステータスをセットする
        self.set_status(TaskStatus.SUCCESS)

        return self._status == TaskStatus.SUCCESS

login_task.py
from features.tasks._abstracts.task_subject import TaskSubject
from features.tasks.task_status import TaskStatus
from time import sleep


class LoginTask(TaskSubject):
    def __init__(self, name):
        super().__init__(name)

    def execute(self) -> bool:
        # 初期化処理タスク
        # ステータスを作業中にする
        self.set_status(TaskStatus.PROGRESSING)

        sleep(2)  # ←なんらかの処理

        # 今回はテストなので確定で成功のステータスをセットする
        self.set_status(TaskStatus.SUCCESS)

        return self._status == TaskStatus.SUCCESS

failed_task.py
from features.tasks._abstracts.task_subject import TaskSubject
from features.tasks.task_status import TaskStatus
from time import sleep


class FailedTask(TaskSubject):
    def __init__(self, name):
        super().__init__(name)

    def execute(self) -> bool:
        # 初期化処理タスク
        # ステータスを作業中にする
        self.set_status(TaskStatus.PROGRESSING)

        sleep(2)  # ←なんらかの処理

        # 今回はテストなので確定で失敗のステータスをセットする
        self.set_status(TaskStatus.FAILED)

        return self._status == TaskStatus.SUCCESS

success_task.py
from features.tasks._abstracts.task_subject import TaskSubject
from features.tasks.task_status import TaskStatus
from time import sleep


class SuccessTask(TaskSubject):
    def __init__(self, name):
        super().__init__(name)

    def execute(self) -> bool:
        # 初期化処理タスク
        # ステータスを作業中にする
        self.set_status(TaskStatus.PROGRESSING)

        sleep(2)  # ←なんらかの処理

        # 今回はテストなので確定で成功のステータスをセットする
        self.set_status(TaskStatus.SUCCESS)

        return self._status == TaskStatus.SUCCESS

各内容を見ていくとコメントが違うだけですが、本来は sleep() を呼び出している箇所でTask固有の処理をしている物と考えます。
以上のようにTaskSubjectを継承し、実行処理内で set_status() に進捗ステータスをセットすることで各observerに進捗が通知されるという所まで実装できました。

TaskCollection

それでは、作成したTaskの一括実行や、observerの一括attachなどを行うクラスを作成していきます。

task_collection.py
from typing import List
from features.commons._abstracts.observer import Observer
from features.tasks._abstracts.task_subject import TaskSubject
from time import sleep


class TaskCollection:
    def __init__(self):
        self._tasks: List[TaskSubject] = []

    def add_tasks(self, tasks: List[TaskSubject]):
        self._tasks.extend(tasks)

    def remove_task(self, task: TaskSubject):
        self._tasks.remove(task)

    def attach_observer(self, observer: Observer):
        for task in self._tasks:
            task.attach(observer)

    def detach_observer(self, observer: Observer):
        for task in self._tasks:
            task.detach(observer)

    def get_total_task(self):
        # タスク数を返す
        return len(self._tasks)

    def execute(self):
        for task in self._tasks:
            sleep(1)
            # Taskの実行メソッドを呼び出す
            result = task.execute()

            # Taskが完了しなかった場合には以降のタスク実行を中止
            if not result:
                break

このクラスの持つ機能としては基本的にTaskの追加削除などの管理、Taskに対してobserverのattachやdetach、登録されたTaskの実行を行います。
そして、実行中にTaskが完了しなかった場合には以降のタスク実行を中止します。
基本的にはこのクラスに登録されているTaskの数をProgressBarに渡すことで総タスク数を表示します。

TaskBar

ここまでで下準備が完了しましたので、とうとう画面側のコンポーネントを実装していきます。

  1. 実行済みタスク/総実行タスク => 例:1/3 のような表示で現在の実行中タスク位置がわかる
  2. コンポーネントは自身を登録したTaskからの情報や進捗を受け取る
  3. 受け取った通知から進捗ステータス毎に処理を分岐
  4. 分岐先でコンポーネント内の要素を更新する
  5. 画面をupdateする

以上の機能を実装したコンポーネントが以下です。

task_bar.py
import flet as ft
from features.commons._abstracts.observer import Observer
from features.tasks.task_status import TaskStatus


class TaskBar(ft.Column, Observer):
    def __init__(self, total_task: int):
        super().__init__()

        self.total_task = total_task
        self.progressing_task_count = 0

        # ProgressBarの作成
        self.progress_bar = ft.ProgressBar(
            value=0.0, color=ft.colors.CYAN_500, expand=1
        )

        # タスクの進捗率(0/n)という表示用のText
        self.task_ratio_text = ft.Text(
            value=f"{self.progressing_task_count}/{self.total_task}"
        )

        # タスクのメッセージを表すText
        self.task_message_text = ft.Text(value="タスクの実行を開始します")

        # タスクのステータスを表すText(最初はタスクの実行が無いので非表示)
        self.task_status_text = ft.Text(value="", visible=False)

    def update(self, name: str, status: TaskStatus):
        # 共通で実行する処理
        self.task_message_text.value = f"タスク:{name} が進行中です"
        self.task_ratio_text.value = f"{self.progressing_task_count}/{self.total_task}"

        # 通知されたステータス毎に処理を分ける
        match status:
            case TaskStatus.PROGRESSING:
                self.task_status_text.visible = False
                self.progressing_task_count += 1
                self.progress_bar.value += 1 / self.total_task
            case TaskStatus.SUCCESS:
                if self.total_task == self.progressing_task_count:
                    self.progress_bar.value = 1.0
                    self.progress_bar.color = ft.colors.LIGHT_GREEN_500
                    self.task_message_text.value = "タスクの実行が全て"
                    self.task_status_text.value = "完了しました"
                else:
                    self.task_status_text.value = "完了 ◯"
                self.task_status_text.visible = True
                self.task_status_text.color = ft.colors.LIGHT_GREEN_500
            case TaskStatus.FAILED:
                self.task_status_text.value = "失敗 ✕"
                self.task_status_text.color = ft.colors.RED_500
                self.task_status_text.visible = True
                self.progress_bar.color = ft.colors.RED_500

        # UIの更新
        super().update()

    def build(self):
        self.controls = [
            ft.Row(controls=[self.task_message_text, self.task_status_text]),
            ft.Row(controls=[self.progress_bar, self.task_ratio_text]),
        ]

以上が今回の主目的であるタスクベースのProgressBarを実装したコンポーネントになります。
コンポーネント自体がObserverを実装しているため、 update() メソッドでTaskSubjectからの進捗通知を受け取ります。
そして、受け取った通知の進捗ステータスで画面表示を変えるためにcaseで処理を分岐し、同時に ft.Column を継承するクラスでもあるため、 super().update() を実行し画面を更新します。
こうすることでSubjectからの進捗通知でクラス内の情報を更新後に画面も更新する処理ができました。

動作確認

では実際にコンポーネントを画面に追加して動かしてみます。

main.py
import flet as ft
from features.components.progress_bar.task_bar import TaskBar
from features.tasks.task_collection import TaskCollection
from features.tasks.init_task import InitTask
from features.tasks.login_task import LoginTask
from features.tasks.success_task import SuccessTask
from features.tasks.failed_task import FailedTask


def main(page: ft.Page):
    task_collection = TaskCollection()
    task_collection.add_tasks(
        [
            InitTask("初期化タスク"),
            LoginTask("ログインタスク"),
            FailedTask("失敗確定タスク"),
            SuccessTask("確定成功タスク"),
        ]
    )
    progress_bar = TaskBar(task_collection.get_total_task())
    task_collection.attach_observer(progress_bar)

    page.add(
        progress_bar,
        ft.TextButton(text="StartTask!!", on_click=lambda e: task_collection.execute()),
    )


ft.app(main)

  1. 各TaskをTaskCollectionに追加
  2. TaskBarをインスタンス化
  3. 各TaskにTaskBarをObserverとしてattach
  4. TextButtonの on_click イベントにTaskCollectionの execute(実行) を登録

という手順を行っています。
以上のコードで画面上の [StartTask!!] ボタンをクリックすると...

以下gifのようにProgressBarがTaskの進捗と共に変わっていくことが確認できます。

以上の例だとTaskに FailedTask が登録されているため実行の途中で失敗してしまいますが、配列から削除して再度実行してみると全てのタスクが完了した旨のメッセージが表示されます。

おまけ(リファクタリング)

現在の TaskBar 内の update() で行われているTaskのstatusで分岐する処理をStrategyを使いリファクタリングしてみます。

task_status_strategy.py
import flet as ft
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from features.tasks.task_status import TaskStatus

if TYPE_CHECKING:
    from features.components.progress_bar.task_bar import TaskBar


class TaskStatusStrategy(ABC):
    @abstractmethod
    def update_ui(self, task_bar: "TaskBar"):
        pass


class ProgressingTaskStrategy(TaskStatusStrategy):
    def update_ui(self, task_bar: "TaskBar"):
        task_bar.task_status_text.visible = False
        task_bar.progressing_task_count += 1
        task_bar.progress_bar.value += 1 / task_bar.total_task


class SuccessTaskStrategy(TaskStatusStrategy):
    def update_ui(self, task_bar: "TaskBar"):
        print(task_bar.total_task == task_bar.progressing_task_count)
        if task_bar.total_task == task_bar.progressing_task_count:
            task_bar.progress_bar.value = 1.0
            task_bar.progress_bar.color = ft.colors.LIGHT_GREEN_500
            task_bar.task_message_text.value = "タスクの実行が全て"
            task_bar.task_status_text.value = "完了しました"
        else:
            task_bar.task_status_text.value = "完了 ◯"
        task_bar.task_status_text.visible = True
        task_bar.task_status_text.color = ft.colors.LIGHT_GREEN_500


class FailedTaskStrategy(TaskStatusStrategy):
    def update_ui(self, task_bar: "TaskBar"):
        task_bar.task_status_text.value = "失敗 ✕"
        task_bar.task_status_text.color = ft.colors.RED_500
        task_bar.task_status_text.visible = True
        task_bar.progress_bar.color = ft.colors.RED_500


class TaskStatusStrategyContext:
    def __init__(self):
        self._strategies: dict[TaskStatus, TaskStatusStrategy] = {
            TaskStatus.PROGRESSING: ProgressingTaskStrategy(),
            TaskStatus.SUCCESS: SuccessTaskStrategy(),
            TaskStatus.FAILED: FailedTaskStrategy(),
        }
        self._strategy: TaskStatusStrategy = None

    def set_strategy(self, status: TaskStatus):
        self._strategy = self._strategies.get(status)

    def apply_strategy(self, task_bar: "TaskBar"):
        if self._strategy:
            self._strategy.update_ui(task_bar)

update() 内のcaseに記述された処理をStrategyとして各クラスに切り出しました。
そして、TaskStatusStrategyContextクラスで各Strategyをstatusによって呼び出せるようにしています。
これをTaskBar内で呼び、apply_strategy() で呼び出されたクラスの update_ui() を実行します。

実際にこの処理をTaskBarに組み込むと以下のようになります。

progress_bar_of_task.py
import flet as ft
from features.commons._abstracts.observer import Observer
from features.tasks.task_status import TaskStatus
from features.components.progress_bar.task_bar_strategies.task_status_strategy import (
    TaskStatusStrategyContext,
)


class TaskBar(ft.Column, Observer):
    def __init__(self, total_task: int):
        super().__init__()

        self.total_task = total_task
        self.progressing_task_count = 0

        self._status_strategy_context = TaskStatusStrategyContext()

        # ProgressBarの作成
        self.progress_bar = ft.ProgressBar(
            value=0.0, color=ft.colors.CYAN_500, expand=1
        )

        # タスクの進捗率(0/n)という表示用のText
        self.task_ratio_text = ft.Text(
            value=f"{self.progressing_task_count}/{self.total_task}"
        )

        # タスクのメッセージを表すText
        self.task_message_text = ft.Text(value="タスクの実行を開始します")

        # タスクのステータスを表すText(最初はタスクの実行が無いので非表示)
        self.task_status_text = ft.Text(value="", visible=False)

    def update(self, name: str, status: TaskStatus):
        # 共通で実行する処理
        self.task_message_text.value = f"タスク:{name} が進行中です"
        self.task_ratio_text.value = f"{self.progressing_task_count}/{self.total_task}"

        # 通知されたステータス毎に処理を分ける
        self._status_strategy_context.set_strategy(status)
        self._status_strategy_context.apply_strategy(self)

        # UIの更新
        super().update()

    def build(self):
        self.controls = [
            ft.Row(controls=[self.task_message_text, self.task_status_text]),
            ft.Row(controls=[self.progress_bar, self.task_ratio_text]),
        ]

update() を見てみると、Strategyに処理が委譲されたことで set_strategy()apply_strategy() を呼ぶのみになっています。
こうすることで後々statusに追加があった場合でも update() メソッド内に手を加えずに、新しくTaskStrategyを継承したクラスを作成しTaskStrategyContextに登録することで安定して動作している箇所に手を加えず、機能を追加することが可能です。

おわりに

今回のような実装にすることで、TaskBar以外にもObserverをattachすることでTaskの進捗に関する通知を複数のObserverに送ることが可能です。(同時にLogも出力するなど)
他にも、Taskの処理失敗時に設定の回数再試行するような処理を追加してみたりすると面白いかもしれません。
ここまでご覧いただきありがとうございました。

Discussion