動的計画法ベースの数理最適化ソルバDIDPPyで最短共通超配列問題を解く
はじめに
最近、動的計画法ベースの数理最適化ソルバ DIDPPy を見つけ、様々な実装を試しています。
しかし、日本語での解説記事が少ないため、今回その使用経験を共有することにしました。
題材には、最短共通超配列問題を選びました。
この問題は、espeon011氏から実務で関連する事例として教えていただいたものです。
氏のGitHubリポジトリでは、今回の私の実装に加えて様々なアルゴリズムとの比較や、DNA配列などの問題インスタンスを用いた実験結果が公開されています。
- espeon011氏のGitHubリポジトリ:https://github.com/espeon011/opt_note/tree/main/src/scsp
 
また、本記事は数理最適化勉強会での議論が書くきっかけとなっており、ohken322氏からも助言をいただきました。
本記事では、DIDPPy を用いた SCS の実装とその解説に焦点を当てていきます。
DIDPPy
DIDPPy は動的計画法(Dynamic Programming)の考え方に基づいた数理最適化ソルバです。
その名称は Domain-Independent Dynamic Programming に由来しており、動的計画法で問題を定式化することを通じてソルバに渡せるモデルを与えることになります。このモデルを用いて効率的に解を探索してくれるのが特徴です。
従来の一般的な最適化ソルバとは異なり、DIDPPy を使うにはまず問題を動的計画法としてモデル化する必要があります。そのため、少し難易度の異なるアプローチが求められます。
競技プログラミングなどで動的計画法に慣れている人にとっては、比較的スムーズに使いこなせるツールだと思います。
公式チュートリアルでは、時間制約付き巡回セールスマン問題(TSP)が例として取り上げられており、DIDPPy がどのように問題をモデル化して扱うか解説されています。
最短共通超配列(SCS)
最短共通超配列(SCS; Shortest Common Supersequence) とは、与えられた複数の文字列をすべて部分列として含む、最も短い文字列です。
たとえば、文字列 "AGGTAB" と "GXTXAYB" のSCSは "AGXGTXAYB" となります。この関係は、以下のように視覚的に示すと理解しやすいです。
点線より上の文字列がSCSで、点線より下の文字列は元の文字列がSCS内でどのように出現しているかを示しています。
AGXGTXAYB
---
AG.GT.A.B
.GX.TXAYB
SCS は、文字列の数が 
- 
 の場合: 計算量はm=2  であり、比較的短時間で解けるため LeetCode のようなオンラインプログラミングコンテストでも扱われている問題です。O(n^2)  - 
 の場合: 文字列の数が増えると計算量が爆発的に増加するため、単純な動的計画法では現実的な時間で解くのが困難になります。また、厳密解を求めることも難しいです。今回の記事では、m > 2  が大きいケース、具体的にはn,m  の問題インスタンスを扱います。n=15, m=64  
専門ではないので詳しくないですが、SCSはDNAシーケンシングやデータ圧縮など、さまざまな分野で応用されているようです。
 動的計画法による定式化(m=2  )
まず理解を深めるため、
部分問題として、文字列 
ただし添字は0-indexed で、
先程の例を使うと、
- 
 は "AGGTAB" と "GXTXAYB" のSCSの長さL[0][0]  - 
 は "GGTAB" と "GXTXAYB" のSCSの長さL[1][0]  - 
 は "GGTAB" と "XTXAYB" のSCSの長さL[1][1]  - 
 は ""(空文字列) と "XTXAYB" のSCSの長さ = 6L[6][1]  
となります。
記法として文字列 
- 
 のときs_1[i] = s_2[j] - 先頭の文字が一致しているので、その文字をSCSに含めることで、問題は 
 とs_1  の次の文字からのSCSを求める問題に帰着します。s_2 
したがって となります。L[i][j] = 1 + L[i+1][j+1]  
 - 先頭の文字が一致しているので、その文字をSCSに含めることで、問題は 
 - 
 のときs_1[i] \not= s_2[j] - 先頭の文字が異なるため、SCSにはどちらか一方の文字を含める必要があります。
- 
 を含めた場合、次に考えるべきはs_1[i]  の次の文字とs_1  の現在の文字からのSCSであり、その長さはs_2  です。1 + L[i+1][j]  - 
 を含めた場合、次に考えるべきはs_2[j]  の現在の文字とs_1  の次の文字からのSCSであり、その長さはs_2  です。1 + L[i][j+1]  
 - 
 - SCSは最も短い文字列であるため、この二つの選択肢のうちより短い方を採用します。したがって、
 となります。L[i][j] = 1 + \min(L[i+1][j], L[i][j+1])  
 - 先頭の文字が異なるため、SCSにはどちらか一方の文字を含める必要があります。
 
境界条件(文字列の終端)を含めて記述すると、以下の漸化式が得られます。
元の問題に対するSCSの長さは 
動的計画法による定式化(一般化)
文字列が 
2つの文字列のケースと同様のアプローチで、
このとき、SCSの1文字を決定し、次の状態へ遷移することを考えます。
SCSに追加する文字は、
ここで、
新しい状態のインデックス集合を 
直観的には、SCSに 
SCSは最も短い文字列であるため、すべての可能な遷移の中から最も短い結果を選びます。つまり、SCSに追加する文字 
定式化すると、以下のようになります(境界条件は省略しています)。
最終的な答え(SCSの長さ)は 
このアプローチでは、状態数が 
しかし、DIDPPy ではこの膨大な状態空間をすべて探索するのではなく、ビームサーチなどの手法を用いて効率的に探索するため、大規模な問題にも対応できます。
DIDPPyによるSCS問題のモデル化
これまでの定式化を踏まえ、DIDPPyを使って実際に問題を解いていきます。DIDPPyを扱う上で最低限必要な機能と手順を順を追って説明します。
1. モデルの構築
まず、didppy.Modelクラスのインスタンスを生成します。
ここでは、最小化問題(maximize=False)であり、コストは整数(float_cost=False)であることを指定します。
import didppy as dp
model = dp.Model(maximize=False, float_cost=False)
2. Object Types の定義
変数の型に相当する ObjectType を定義します。
今回の変数は文字列のインデックスであり、その範囲は0から
index_type = model.add_object_type(number=n + 1)
3. 状態変数の定義
状態変数はいくつか種類がありますが、今回は整数型のみを使います。
- Element Variables: 特定の
ObjectTypeに紐づく変数です。model.add_element_varで追加します。 - Resource Variables: コストに関して単調性を持つ変数に用います。
model.add_element_resource_varで追加します。 - IntVar: 型に紐づかない一般的な整数変数です。
 
ドキュメントでは使えるときにはResource Variablesの使用が推奨されていますが、実際には性能が良くならない場合もありました(理由は不明です)。今回はResource Variables を使用します。
また、最終的に求めたいコストとなる状態をtargetで指定します。今回はインデックスがすべて0の状態が最終目標なのでtarget=0と設定します。
indices_vars = [
    model.add_element_resource_var(
        object_type=index_type, target=0, less_is_better=False
    )
    for i in range(m)
]
4. ルックアップテーブルの定義
動的計画法の漸化式で文字列の文字を比較するため、その結果を事前に計算してDIDPPy のルックアップテーブルとして保持します。
これは、DIDPPy モデル内で直接文字列のインデックスアクセスを行う代わりに、定数配列として扱うためのものです。
具体的には、各文字列の組み合わせ 
このテーブルは、各組み合わせに対してサイズ 
A = [[None for _ in range(m)] for _ in range(m)]
for p in range(m):
    for q in range(m):
        arr: list[list[int]] = []
        for i in range(n + 1):
            row = []
            for j in range(n + 1):
                row.append(
                    1 if i < n and j < n and inputs[p][i] == inputs[q][j] else 0
                )
            arr.append(row)
        A[p][q] = model.add_element_table(arr)
5. 遷移の定義
遷移はTransitionクラスのインスタンスとして定義し、model.add_transitionでモデルに追加します。引数の意味は以下のとおりです。
- 
name: 遷移を識別するための名前。 - 
cost: 遷移によって変化するコスト。今回は1文字追加するごとにコストが1増えるため、1 + dp.IntExpr.state_cost()とします。 - 
effects: 状態変数がどのように変化するかを定義します。今回の状態変数は であり、更新後はi_q  となります。i^{(p)}_q = i_q + A_{pq}[i_p,i_q]  - 
preconditions: 遷移が実行されるための条件。今回は、遷移元の文字列のインデックスが終端に達していないこと ( ) が必要です。i_q < n  
for p in range(m):
    # p番目の文字列に従って進める
    trans = dp.Transition(
        name=f"trans_{p}",
        cost=1 + dp.IntExpr.state_cost(),
        effects=[
            (
                indices_vars[q],
                indices_vars[q] + A[p][q][indices_vars[p], indices_vars[q]],
            )
            for q in range(m)
        ],
        preconditions=[indices_vars[p] < n],
    )
    model.add_transition(trans)
6. Base Case の定義
これ以上遷移が起こらない、つまり探索の終点となる条件を指定します。
今回は、すべての文字列のインデックスが終端に達したとき、すなわち 
model.add_base_case([indices_vars[i] == n for i in range(m)])
7. ソルバの実行
最後に、定義したモデルをソルバに渡し、探索を実行します。
いくつかのアルゴリズムが用意されていますが、特に指定がない場合はCABSが推奨されています。ただし、CABSは定式化に特定の条件が求められるため、注意が必要です。ソルバには、time_limit(実行時間の上限)やスレッド数などのパラメータを指定できます。
ソルバから返ってくるsolutionオブジェクトには、見つかった最適解に関する情報(コストや遷移の系列)が含まれています。
解そのもの(この場合はSCS文字列)を復元するには、この遷移のリストを辿る必要がありますが、その詳細はここでは省略します。
solver = dp.CABS(model, time_limit=60)
solution = solver.search()
# 状態遷移を表示する
# print(solution.transitions)
ここまでの設定で、ソルバを動かす準備が整いました。
しかし、この実装はまだ性能に課題があるため、その改善策について考察します。
Dual Bound
DIDPPyの性能を最大限に引き出す上で、dual bound の設定が重要とされています。
dual boundとは、(最小化問題では)ある状態から得られる解のコストに対する下界のことです。
つまり、「現在の状態からのコストは、少なくともこの値以上になる」という保証を意味します。
今回の例だと自明な下界は0ですが、DIDPPyではたとえ自明な下界であっても設定することが推奨されています。
よりタイトな下界を与えるほど、ソルバは無駄な探索を枝刈りできるため、一般的には性能が向上します。
ここでは、シンプルな下界を一つ実装します。
現在の状態を 
この状態からのSCSの長さの下界を求めるために、2つの文字列に注目します。
- 文字列のペアを選択: 与えられた 
 個の文字列のうち、任意の2つm  とs_p  を選びます。s_q  - 2文字列のSCSを計算: この2つの文字列 
 のs_p  文字目以降とi_p  のs_q  文字目以降からなるSCSの長さを計算します。これは、i_q  の場合の動的計画法を用いることで、m=2  の時間で求めることができます。O(n^2)  - 最大値を下界とする: すべての文字列のペア 
 についてこのSCSの長さを計算し、その最大値をdual boundとして採用します。(s_p, s_q)  
より複雑でタイトな下界を考案することも可能ですが、下界の計算に時間がかかりすぎると、そのメリットを打ち消してしまうことがあります。
下界の計算時間がソルバの探索効率を上回ると、かえって性能が悪化します。実際、よりタイトな下界を与える方法も実装しましたが、性能は悪化してしまいました。
このあたりはトレードオフで考えないといけないポイントです。
実際に、アルファベット26種類、文字列長 
- デュアルバウンドなし: SCSの長さは350
 - 今回のデュアルバウンドを使用: SCSの長さは317
 
同じ定式化でもdual boundがソルバの性能に大きな影響を与える例となっています。
expr = 0
for j in range(m):
    for i in range(j):
        # m=2 の場合の動的計画法を行い、テーブルを構築
        dp_sol = Dp2NaiveAlgo()
        dp_sol.shortestCommonSupersequence(inputs[i], inputs[j]) 
        # dpテーブルを登録
        table_i_j = model.add_int_table(dp_sol.dp)
        expr = dp.max(expr, table_i_j[indices_vars[i], indices_vars[j]])
model.add_dual_bound(expr)
また、実行結果の一例も示します。
経過時間が増えるにつれて、ビームサーチで探索した状態数が増え、また解のbound が更新されていることが確認できます。
Solver: CABS from DIDPPy v0.10.0
Searched with beam size: 1, expanded: 321, elapsed time: 0.400949033
New dual bound: 30, expanded: 321, elapsed time: 0.400973638
New primal bound: 321, expanded: 321, elapsed time: 0.400974932
Searched with beam size: 2, expanded: 954, elapsed time: 1.196479774
Searched with beam size: 4, expanded: 2219, elapsed time: 2.766163895
New primal bound: 317, expanded: 2219, elapsed time: 2.766646067
Searched with beam size: 8, expanded: 4721, elapsed time: 5.985171936
New primal bound: 314, expanded: 4721, elapsed time: 5.986287272
Searched with beam size: 16, expanded: 7231, elapsed time: 10.000514397
Reached time limit.
まとめ
本記事では、動的計画法ベースの数理最適化ソルバDIDPPy を用いて、最短共通超配列問題(SCS)を解く方法について解説しました。
実際に動的計画法による定式化をもとにしてDIDPPyへの実装を行いました。そして
またソルバの性能を向上させるため、DIDPPyでは定式化だけでなくdual boundを考えることも重要になります。
この解説が、DIDPPyや動的計画法を用いた最適化問題に興味を持つ方々の助けになれば嬉しいです。
興味を持たれた方はぜひ、他の問題にも応用してみてください!
コード全体
def solve_with_didppy_n(inputs: list[str], bound_option=-1, **kwargs) -> str:
    """
    DIDPPyを用いたSCS問題の解法
    Args:
        inputs (list[str]): 入力文字列のリスト
        bound_option (int, optional): 下界(dual bound)の設定方法
            - 0: 文字列の長さを用いた簡単なもの
            - 1(default): すべての2入力の組み合わせでSCSを解くもの
            - 2: アルファベットの使用数を用いるもの
        **kwargs: DIDPPYソルバーへの追加パラメータ
    Returns:
        str: 入力全てを部分列として含む最短共通超列(SCS)
    """
    m = len(inputs)
    n = len(inputs[0])
    model = dp.Model(maximize=False, float_cost=False)
    index_type = model.add_object_type(number=n + 1)
    # dp[i_1,...,i_m] := 文字列1のi_1文字目以降、...、文字列mのi_m文字目以降を使うときのSCSの長さ
    indices_vars = [
        model.add_element_resource_var(
            object_type=index_type, target=0, less_is_better=False
        )
        for i in range(m)
    ]
    # (table[s][t])[i,j] (s,t < m, i <= ns[s], j <= ns[t]) := 文字列s,tのi番目とj番目が同じとき1 o.w. 0
    A = [[None for _ in range(m)] for _ in range(m)]
    for p in range(m):
        for q in range(m):
            arr: list[list[int]] = []
            for i in range(n + 1):
                row = []
                for j in range(n + 1):
                    row.append(
                        1 if i < n and j < n and inputs[p][i] == inputs[q][j] else 0
                    )
                arr.append(row)
            A[p][q] = model.add_element_table(arr)
    model.add_base_case([indices_vars[i] == n for i in range(m)])
    for p in range(m):
        # p番目の文字列に従って進める
        trans = dp.Transition(
            name=f"trans_{p}",
            cost=1 + dp.IntExpr.state_cost(),
            effects=[
                (
                    indices_vars[q],
                    indices_vars[q] + A[p][q][indices_vars[p], indices_vars[q]],
                )
                for q in range(m)
            ],
            preconditions=[indices_vars[p] < n],
        )
        model.add_transition(trans)
    def dp_reduction(dp_op, exprs):
        l = 1
        n = len(exprs)
        while l < n:
            for i in range(0, n, l * 2):
                if i + l < n:
                    exprs[i] = dp_op(exprs[i], exprs[i + l])
            l *= 2
        return exprs[0]
    if bound_option == 0:
        # 残っている文字数が最長のものが下限
        exprs = []
        for i in range(m):
            min_to = model.add_int_table([n - j for j in range(n + 1)])
            expr = min_to[indices_vars[i]]
            exprs.append(expr)
        model.add_dual_bound(dp_reduction(dp.max, exprs))
    elif bound_option == 1:
        # 2つの文字列i,jのSCSのうち最長のものが下限
        exprs = []
        for j in range(m):
            for i in range(j):
                dp_sol = Dp2NaiveAlgo()
                dp_sol.shortestCommonSupersequence(inputs[i], inputs[j])
                table_i_j = model.add_int_table(dp_sol.dp)
                exprs.append(table_i_j[indices_vars[i], indices_vars[j]])
        expr = dp_reduction(dp.max, exprs)
        model.add_dual_bound(expr)
    elif bound_option == 2:
        # bound = sum_{a \in Alphabets} max_{0 <= i < m}{i番目の文字列の残っているものでアルファベットaの個数}
        alphabets: set[str] = set()
        for s in inputs:
            for c in s:
                alphabets.add(c)
        alphabets = list(alphabets)
        A = len(alphabets)
        # alpha_counts[alphabet][s_i][idx]
        alpha_counts: list[list[list[int]]] = [
            [[0 for _ in range(n + 1)] for i in range(m)] for _ in range(A)
        ]
        for a_i, c in enumerate(alphabets):
            for s_i, s in enumerate(inputs):
                alpha_counts[a_i][s_i][n] = 0
                for i in range(n - 1, -1, -1):
                    alpha_counts[a_i][s_i][i] = alpha_counts[a_i][s_i][i + 1] + (
                        1 if s[i] == c else 0
                    )
        bound = 0
        for a_i in range(A):
            exprs = []
            for s_i in range(m):
                table_i = model.add_int_table(alpha_counts[a_i][s_i])
                exprs.append(table_i[indices_vars[s_i]])
            expr = dp_reduction(dp.max, exprs)
            bound += expr
        model.add_dual_bound(bound)
    else:
        print(f"unknown bound_option {bound_option}")
    # solve
    solver = dp.CABS(model, **kwargs)
    solution = solver.search()
    idx = [0 for _ in range(m)]
    ret = ""
    for t in solution.transitions:
        if t.name.startswith("trans_"):
            i = int(t.name.lstrip("trans_"))
            c = inputs[i][idx[i]]
            for j in range(m):
                if idx[j] < n and inputs[j][idx[j]] == c:
                    idx[j] += 1
            ret += c
    return ret
Discussion