🎄

Haskell で、優先度付きキューを使ったダイクストラ法

2022/12/24に公開

Haskellのカレンダー | Advent Calendar 2022 - Qiita に参加させていただきます!

突然ですが Haskell でダイクストラ法を実装します。

ダイクストラ法は重み付きグラフで最短経路問題を解くアルゴリズムのひとつです。ダイクストラ法 - Wikipedia に詳しい解説があります。

ダイクストラ法は、重み付きグラフにおいて、その重みに負の値がない・・・つまり重みが正であることを前提にしています。この構造上の仮定によって、貪欲的手法を取ることができるのがその特徴で、結果ベルマン・フォード法などの汎用的なアルゴリズムよりも計算量的に有利になります。

ダイクストラ法では、始点から各頂点への到達コストを最初に \infty と置いて、そこから緩和操作によって徐々にそれらを最適コストまで収束させていくわけですが、このとき グラフの頂点集合からその時点で最小のコストをもつ頂点を選び隣接する頂点を緩和するとことを繰り返す だけで最適解に収束します。ここが、構造上の仮定を置いたことによる貪欲的手法のポイントになのだと思います。

ダイクストラ法を実装するにあたり、この「頂点集合から最少のコストを持つ頂点を選ぶ」のに優先度付きキューを使うことで効率的な計算が可能であることが良く知られています。

Haskell でダイクストラ法を実装

競技プログラミングの鉄則 | マイナビブックス が AtCoder 上に用意しているダイクストラ法の問題があり、それを解くのを目的に、 Haskell で実装しました。

A64 - Shortest Path 2

非負の重み付きグラフが入力で与えられる中、任意の頂点までの最短経路長を求める問題です。

さて、優先度つきキューはどうしようかなと思ったのですが Haskell-jp wiki - データ構造列伝 に載っていた、ヒープの実装である Data.Heap を使いました。Data.Heap は本稿執筆時点の AtCoder でも利用できました。

Data.Heap

Data.Heap は以下のように使います。

import qualified Data.Heap as Heap

main :: IO ()
main = do
  let heap = Heap.fromList ([1, 3] :: [Int])
      heap' = Heap.insert 2 heap

  print heap' -- fromList [1,2,3]

始めに [1, 3] という二つの値でヒープを構築して、そこに 2 を追加しましたが、内部では [1, 2, 3] に整列されてそれが保持されているのが分かります。

Heap をキューとして使う

Data.Heap には uncons 関数があり、これを使うとヒープの中からその時点で最小の値を取り出すことができます。返値は Maybe で、ヒープが空なら Nothing 、値があればその値と更新されたヒープのペアが返ります。

import qualified Data.Heap as Heap

main :: IO ()
main = do
  let heap = Heap.fromList ([1, 3] :: [Int])
      heap' = Heap.insert 2 heap

  print $ Heap.uncons heap' -- Just (1,fromList [2,3])

リスト処理をする際と同様、再帰で uncons をパターンマッチすることによりヒープを優先度つきキューと見立てて、利用することができます。

import qualified Data.Heap as Heap

-- パターンマッチで dequeue する
f :: Maybe (a, Heap.Heap a) -> [a] -> [a]
f Nothing xs = xs
f (Just (x, q)) xs = f (Heap.uncons q) (x : xs)

main :: IO ()
main = do
  let heap = Heap.fromList ([1, 3] :: [Int])
      heap' = Heap.insert 2 heap

  print $ f (Heap.uncons heap') [] -- [3, 2, 1]

なお Data.Heap のヒープは Bootstrapped skew binomial heap というデータ構造の実装で、キューのエンキュー、デキューにそれぞれ相当する操作は insertO(1)uncons

Provides both O(1) access to the minimum element and O(log n) access to the remainder of the heap.

とのことです。十分に高速です。

ヒープには整数だけでなく、任意のデータ構造を扱わせることができます。Data.Heap が提供する Entry 型を使うと優先順位つきのデータ構造をヒープで扱うことができます。

import Data.Heap (Entry (Entry), payload, priority)
import qualified Data.Heap as Heap

f :: Maybe (a, Heap.Heap a) -> [a] -> [a]
f Nothing xs = xs
f (Just (x, q)) xs = f (Heap.uncons q) (x : xs)

main :: IO ()
main = do
  let entries =
        [ Entry {priority = 2 :: Int, payload = "Rust"},
          Entry {priority = 1, payload = "Haskell"},
          Entry {priority = 3, payload = "TypeScript"},
          Entry {priority = 3, payload = "Python"}
        ]

  let q = Heap.fromList entries
  print $ map payload $ f (Heap.uncons q) [] -- ["Python","TypeScript","Rust","Haskell"]

Data.Heap を使ってダイクストラ法を実装する

これで下準備が整いました。Data.Heap を使ってダイクストラ法を実装しました。
イミュータブルに実装したかったので、ダイクストラ法の計算結果は Data.IntMap.Strict に集めています。

import Control.Monad (replicateM)
import Data.Array.IArray (Array, accumArray, (!))
import qualified Data.ByteString.Char8 as BS
import Data.Char (isSpace)
import Data.Heap (Entry (Entry))
import qualified Data.Heap as Heap
import qualified Data.IntMap.Strict as IM
import qualified Data.IntSet as IS
import Data.List (foldl', unfoldr)

getInts :: IO [Int]
getInts = unfoldr (BS.readInt . BS.dropWhile isSpace) <$> BS.getLine

type Vertex = Int
type Weight = Int
type Edge = (Vertex, Weight)
type Graph = Array Vertex [Edge]
type Node = Entry Int Vertex -- コスト (= priority) / 頂点番号 (payload)
type Queue = Heap.Heap Node -- Data.Heap による優先度付きキュー
type Dist = IM.IntMap Int -- キーが頂点番号 / Int は最少コスト

graph :: (Int, Int) -> [[Int]] -> Graph
graph (i, n) uvs = accumArray (flip (:)) [] (i, n) xs
  where
    xs = concatMap (\[u, v, w] -> [(u, (v, w)), (v, (u, w))]) uvs

dijkstra :: Graph -> Int -> Vertex -> Dist
dijkstra g n v0 = f (Just (start, Heap.empty)) IS.empty dist0
  where
    start :: Node
    start = Entry 0 v0

    dist0 :: Dist
    dist0 = IM.fromList $ (v0, 0) : [(k, maxBound :: Int) | k <- [2 .. n]]

    -- パターンマッチでキューから、その時点でコストが最少の頂点を取得し、緩和を再帰で繰り返す
    f :: Maybe (Node, Queue) -> IS.IntSet -> Dist -> Dist
    f Nothing _ dist = dist
    f (Just (Entry currentCost i, q)) done dist
      | IS.member i done = f (Heap.uncons q) done dist
      | otherwise = f (Heap.uncons q') done' dist'
      where
        done' = IS.insert i done
	
	-- キューから取得した頂点に隣接する辺に対して緩和を行う
        edges = g ! i
        (q', dist') = foldl' (relax currentCost) (q, dist) edges

    relax :: Int -> (Queue, Dist) -> Edge -> (Queue, Dist)
    relax currentCost (q, dist) (v, w) = (q', dist')
      where
        cost = currentCost + w
        dist' = IM.adjust (`min` cost) v dist
	
	-- 緩和の結果、コストが更新された場合はキューに入れる
	-- 更新されない場合は最適解に収束したので何もしない
        q'
          | cost < dist IM.! v = Heap.insert (Entry cost v) q
          | otherwise = q

main :: IO ()
main = do
  [n, m] <- getInts
  uvs <- replicateM m getInts

  let g = graph (1, n) uvs
      dist = dijkstra g n 1

  mapM_ (\(_, w) -> print $ if w /= (maxBound :: Int) then w else -1) (IM.toAscList dist)

これで無事 AC しました。

Data.Set で優先度付きキュー

そういえばアルゴ式にもダイクストラ法の問題があったなと思い出しました。

Q11. ダイクストラ法の高速化 | アルゴ式

せっかく実装つくったしこちらも AC しておくか・・・と思い実装を投げたらランタイムエラー。アルゴ式では Data.Heap が使えませんでした。なるほど。

他にもアルゴ式では、 AtCoder では使える Data.Vector が使えなかったりと多少制限がかかりますが、より標準的なライブラリのみで実装する訓練になるので悪くないなと思っています。

もとい、優先度つきキューをどうしようかなと。

やりたいことは「集合の中から最少の値を持ってくる」ということなので Haskell の標準ライブラリ (といっていいんですかね?) の中から似たような操作が現実的な計算量で行えるものをチョイスすればよさそうです。結論、おなじみの Data.Set を使いました。

Data.Set には minView という関数があり、これを使うと Data.Heap の uncons 同様、集合の中から最小値を取得しつつ残った集合を得ることが可能です。返り値が Maybe なのも同じくです。

余談ですが Data.Heapuncons には別名で viewMin という名前がついてます。 Set の方は minView で Heap の方は viewMin 惜しい。

また Data.Map や Data.IntMap などにも同様の minView があります。Map の minView はキーの値をもとに最小値を得るわけですが、キーは当然重複を許しません。ダイクストラ法では「コストが最少」を計算するので、重複値が取れない Map は向いていません。

閑話休題 Data.Set で優先付きキューです。

Data.Heap のとき同様の例として、以下のように使うことができます。

import qualified Data.Set as Set

f :: Maybe (a, Set.Set a) -> [a] -> [a]
f Nothing xs = xs
f (Just (x, q)) xs = f (Set.minView q) (x : xs)

main :: IO ()
main = do
  let entries = [(2 :: Int, "Rust"), (1, "Haskell"), (3, "TypeScript"), (3, "Python")]

  let q = Set.fromList entries
  print $ f (Set.minView q) [] -- [(3,"TypeScript"),(3,"Python"),(2,"Rust"),(1,"Haskell")]

Set で扱う値をタプルにし、その先頭に優先度を持ってくることで minView と組み合わせたときに優先順位をうまく加味した操作になる・・・というところがポイントになります。

ドキュメントによれば計算量は insertO(log \space n)minViewO(log \space n) です。Data.Heap よりも計算量的には劣りますが、今回の目的には十分でしょう。

Data.Set を使ってダイクストラ法を実装する

さて、準備が整ったので、先の実装を Data.Set に入れ換えて実装します。

import Control.Monad (replicateM)
import Data.Array.IArray (Array, IArray (bounds), accumArray, (!))
import qualified Data.ByteString.Char8 as BS
import Data.Char (isSpace)
import qualified Data.IntMap.Strict as IM
import qualified Data.IntSet as IS
import Data.List (foldl', unfoldr)
import qualified Data.Set as Set

getInts :: IO [Int]
getInts = unfoldr (BS.readInt . BS.dropWhile isSpace) <$> BS.getLine

type Vertex = Int
type Weight = Int
type Edge = (Vertex, Weight)
type Graph = Array Vertex [Edge]
type Entry = (Int, Vertex) -- キューへのエントリ: (コスト, 頂点番号)

-- アルゴ式は Data.Heap が使えないので代わりに Set を使う
type Queue = Set.Set Entry -- 優先度付きキュー ・・・ キーが優先度 (コスト) / 値は頂点番号
type Dist = IM.IntMap Int -- キーが頂点番号 / Int は最少コスト

graph :: (Int, Int) -> [[Int]] -> Graph
graph (i, n) uvs = accumArray (flip (:)) [] (i, n) xs
  where
    xs = concatMap (\[u, v, w] -> [(u, (v, w))]) uvs

dijkstra :: Graph -> Vertex -> Dist
dijkstra g v0 = f (Just (start, Set.empty)) IS.empty dist0
  where
    start :: Entry
    start = (0, v0)

    dist0 :: Dist
    dist0 = IM.fromList $ (v0, 0) : [(k, maxBound :: Int) | k <- [s .. e], k /= v0]
      where
        (s, e) = bounds g

    f :: Maybe (Entry, Queue) -> IS.IntSet -> Dist -> Dist
    f Nothing _ dist = dist
    f (Just ((currentCost, i), q)) done dist
      | IS.member i done = f (Set.minView q) done dist
      | otherwise = f (Set.minView q') done' dist'
      where
        done' = IS.insert i done
        edges = g ! i
        (q', dist') = foldl' (relax currentCost) (q, dist) edges

    relax :: Int -> (Queue, Dist) -> Edge -> (Queue, Dist)
    relax currentCost (q, dist) (v, w) = (q', dist')
      where
        cost = currentCost + w
        dist' = IM.adjust (`min` cost) v dist
        q'
          | cost < dist IM.! v = Set.insert (cost, v) q
          | otherwise = q

main :: IO ()
main = do
  [n, m] <- getInts
  uvs <- replicateM m getInts

  let g = graph (0, n - 1) uvs
      dist = dijkstra g 0

  mapM_ (\(_, w) -> print $ if w /= (maxBound :: Int) then w else -1) (IM.toAscList dist)

これで AC しました。

Data.Heap だったところが Data.Set に変わったのみ。

ここまできたら Data.Heap 版と Data.Set 版とを同一の実装で済ませられるよう、高階関数で DI する実装にしようかと思いましたが、晩ご飯のケンタッキーフライドチキンがそろそろ届くので時間切れです。宿題にするとしましょう。

メリークリスマス、良い Haskell ライフを。

2023.8.1

改めて今読み返してみると上記の実装はちょっと冗長な部分があるのと、イミュータブルなデータ構造を使っているので遅いですね。改良版の実装を以下に貼っておきます。

ダイクストラ法を使う問題である、競プロ典型90の 013 - Passing(★5) への回答です。

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -O2 #-}

import Control.Monad
import Data.Array.IArray
import Data.Array.ST
import Data.Array.Unboxed (UArray)
import qualified Data.ByteString.Char8 as BS
import Data.Char (isSpace)
import Data.Heap (Entry (Entry))
import qualified Data.Heap as Heap
import Data.List

main :: IO ()
main = do
  [n, m] <- getInts
  uvs <- replicateM m $ do
    [u, v, w] <- getInts
    return ((u, v), w)

  let g = wGraph2 (1, n) uvs
      dist1 = dijkstra (g !) (bounds g) 1
      distN = dijkstra (g !) (bounds g) n

  mapM_ (\k -> print $ dist1 ! k + distN ! k) [1 .. n]

{-- Library --}

-- ダイクストラ法
dijkstra :: Ix v => (v -> [(v, Int)]) -> (v, v) -> [v] -> UArray v Int
dijkstra nextStates (l, u_) v0s = runSTUArray $ do
  dist <- newArray (l, u_) maxBound

  forM_ v0s $ \v -> do
    writeArray dist v 0

  let queue = Heap.fromList $ map (Entry 0) v0s

  aux (Heap.uncons queue) dist
  return dist
  where
    aux Nothing _ = return ()
    aux (Just (Entry dv v, queue)) dist = do
      -- dv > dist v means (dv, v) is garbage
      garbage <- (dv >) <$> readArray dist v

      if garbage
        then aux (Heap.uncons queue) dist -- skip
        else do
          queue' <-
            foldM
              ( \q (u, w) -> do
                  du <- readArray dist u

                  let dv' = dv + w

                  if dv' < du
                    then do
                      writeArray dist u dv'
                      return $ Heap.insert (Entry dv' u) q
                    else return q
              )
              queue
              (nextStates v)

          aux (Heap.uncons queue') dist
	  
-- 重み付きの辺から無向グラフの隣接リストを構築する
-- >>> wGraph2 (1, 3) [((1, 2), 100), ((2, 3), 5)]
-- array (1,3) [(1,[(2,100)]),(2,[(3,5),(1,100)]),(3,[(2,5)])]
wGraph2 :: (Ix v, Num e) => (v, v) -> [((v, v), e)] -> Array v [(v, e)]
wGraph2 (lb, ub) uvs = accumArray (flip (:)) [] (lb, ub) xs
  where
    xs = concatMap (\((u, v), w) -> [(u, (v, w)), (v, (u, w))]) uvs

getInts :: IO [Int]
getInts = unfoldr (BS.readInt . BS.dropWhile isSpace) <$> BS.getLine

Discussion