🌸

Pythonでも(Oneshot) Algebraic Effectsがしたい

2022/08/23に公開
2

成果物

ひとまずこれが動くようになりました。

NotInteger = Effect()
FiveInteger = Effect()


def parse_int(s: str) -> Result[int, str]:
    try:
        return Ok(int(s))
    except:
        return Err("Could not parse int")


def sum_lines(s: str):
    lines = s.split()
    sum = 0
    for line in lines:
        match parse_int(line):
            case Ok(x):
                if x == 5:
                    sum += yield next(FiveInteger.perform(line))
                else:
                    sum += x
            case Err(_):
                sum += yield next(NotInteger.perform(line))
    return sum


if __name__ == "__main__":

    def handle_notinteger(k, v):
        print(f"Handling Error: {v} is not an integer.")
        ret = k(0)
        return ret

    def handle_five(k, v):
        print(f"Handling Five: replacing {v} with 10")
        ret = k(10)
        return ret

    handler = Handler()
    handler.on(NotInteger, handle_notinteger).on(FiveInteger, handle_five)
    total = handler.run(sum_lines, "1\n2\nthree\n4\n5")
    print(total)
    
    """
    output:
    Handling Error: three is not an integer.
    Handling Five: replacing 5 with 10
    17
    """

(コードの例はRustでAlgebraic Effectsをやる記事を参考にしました)

yield nextのせいでちょっと外見がアレですが、例外と違って、エラーの処理を行なった後に「戻ってきている」のがわかると思います。React Hooksが出てきた当初話題になった記事のJSっぽい記法で書くと

try {
  sum += perform NotInteger;
} handle (effect) {
  if (effect === NotInteger) {
    resume with 0;
  }
}

とかです(最初のコード例ではロギングしたり別のエフェクトもハンドルしていたりしますが)。

ちなみにResult, OK, Errこのライブラリを使っています。

やったこと

Ruby向けのAlgebraic EffectsライブラリであるRuff
https://github.com/Nymphium/ruff

をPythonに移植しています。現状のコードは最後に載せます。

参考にした文献

Ruffの解説記事。
https://nymphium.github.io/2019/12/07/ruby-ae.html

asymmetric coroutinesとoneshot algebraic effectsとの関係の解説記事。
https://nymphium.github.io/2018/12/09/asymmetric-coroutinesによるoneshot-algebraic-effectsの実装.html

RustでAlgebraic Effectsやる記事(ユーザー名とmarkdown記法のバッティングでカード表示ができない)。実は当初こちらのライブラリをベースに移植していたのですが、InjectのIndexの推論相当のことをPythonでやる手段が思いつかなかったのでRuffに切り替えました。

現状のコード

まだUncaughtEff相当はありません。

class Eff:
    __match_args__ = (
        "id",
        "args",
    )

    def __init__(self, id, args):
        self.id = id
        self.args = args


class Resend:
    __match_args__ = (
        "eff",
        "k",
    )

    def __init__(self, eff, k):
        self.eff = eff
        self.k = k


class Effect:
    def __init__(self):
        self.id = id(self)

    def perform(self, v):
        yield Eff(self.id, v)


class Handler:
    class Store:
        def __init__(self):
            self.handlers = {}

        def set(self, r, e):
            self.handlers[r.id] = e

        def get(self, eff):
            return self.handlers[eff.id]

        def get_by_id(self, id):
            return self.handlers[id]

        def exists(self, id):
            return id in self.handlers

    def __init__(self):
        self.handlers = self.Store()
        self.value_handler = lambda x: x

    def to(self, fun):
        self.value_handler = fun
        return self

    def on(self, eff, fun):
        self.handlers.set(eff, fun)
        return self

    def run(self, func, args):
        return self.continue_(func(args))(None)

    def set_handlers(self, handlers):
        self.handlers = handlers

    def continue_(self, co):
        def handle(args):
            try:
                return next(self.handle(co, co.send(args)))
            except StopIteration as e:
                return self.value_handler(e.value)

        return handle

    def handle(self, co, r):
        match r:
            case Eff(id, args):
                if self.handlers.exists(id):
                    handler = self.handlers.get_by_id(id)
                    ret = handler(self.continue_(co), args)
                    yield ret
                raise RuntimeError
            case _:
                return self.value_handler(r)

改良版のコード

https://github.com/makenowjust/eff.js

を参考にUncaughtEff対応した。
Pythonのgeneratorはstacklessコルーチンなのでruffをそのまま移植することができず、どうしようかと思っていたが既にJSでやられていることを論文で知った。
APIをうまくどうにか出来たらライブラリ化しようと思う。

class Eff:
    __match_args__ = (
        "id",
        "args",
    )

    def __init__(self, id, args):
        self.id = id
        self.args = args


class Resend:
    __match_args__ = (
        "eff",
        "k",
    )

    def __init__(self, eff, k):
        self.eff = eff
        self.k = k


class Effect:
    def __init__(self):
        self.id = id(self)

    def perform(self, v):
        return Eff(self.id, v)


class Store:
    def __init__(self):
        self.handlers = {}

    def set(self, r, e):
        self.handlers[r.id] = e

    def get(self, eff):
        return self.handlers[eff.id]

    def get_by_id(self, id):
        return self.handlers[id]

    def exists(self, id):
        return id in self.handlers


def handlers(value_handler, store):
    def hander_impl(gf):
        g = gf()

        def continue_(g):
            def continue_impl(arg):
                try:
                    ret = g.send(arg)
                    yield from handles(ret)
                except StopIteration as e:
                    yield from value_handler(e.value)

            return continue_impl

        def rehandles(k):
            def rehandles_impl(arg):
                def f():
                    yield from k(arg)

                yield from handlers(continue_(g), store)(f)

            return rehandles_impl

        def handles(r):
            match r:
                case Eff(id, args):
                    if store.exists(id):
                        effh = store.get_by_id(id)
                        yield from effh(continue_(g), args)
                    yield Resend(r, continue_(g))
                case Resend(eff, k):
                    if store.exists(eff.id):
                        effh = store.get(eff)
                        yield from effh(rehandles(k), eff.args)
                    yield Resend(r, rehandles(k))

        yield from continue_(g)(None)

    return hander_impl


def combineHandlers(handlers):
    if len(handlers) == 0:

        def f(gf):
            yield from gf()

        return f
    h, rhs = handlers[0], handlers[1:]
    rh = combineHandlers(rhs)

    def f2(gf):
        def g3():
            yield from rh(gf)

        yield from h(g3)

    return f2


def execute(g):
    try:
        ret = next(g)
    except StopIteration as e:
        return e.value


write = Effect()

write2 = Effect()


def m():
    yield write.perform("Hello")
    yield write2.perform("Hello")


def ef(k, text):
    print(text)
    yield from k(None)


def ef2(k, text):
    print(f"{text} 2")
    yield from k(None)


def vh(v):
    yield v


store = Store()
store.set(write, ef)

store2 = Store()
store2.set(write2, ef2)

handle_write = handlers(vh, store)
handle_write2 = handlers(vh, store2)
execute(combineHandlers([handle_write, handle_write2])(m))

Discussion