🔀

Pythonのコルーチンをそこはかとなく理解する

2023/10/09に公開

コルーチンの前に..ジェネレータの基本

コルーチンについていきなり説明する前に、その前提知識となるジェネレータについて解説します。

Pythonの用語集からジェネレータの意味を引用します。

(ジェネレータ) generator iterator を返す関数です。 通常の関数に似ていますが、 yield 式を持つ点で異なります。 yield 式は、 for ループで使用できたり、next() 関数で値を 1 つずつ取り出したりできる、値の並びを生成するのに使用されます。

ジェネレータは、イテレーション中に要素を動的に生成するイテレータになります。

特徴としてはyield を使い「値を返す」&「都度処理を中断」することです。

今回はイテレータという言葉は深く触れませんが、民主主義に乾杯というブログの「イテレータってなに?」が参考になります。

def gen():
    yield 1
    yield 2
    yield 3

g = gen()
print(next(g)) # 1 状態を持っている
print(next(g)) # 2
print(next(g)) # 3
print(next(g)) # StopIteration

ジェネレータの主なユースケースとしては、イテラブルなオブジェクトを引数とする関数に対して、ジェネレータを渡すことができます。

nums = [1, 2, 3, 4, 5]
print(sum(x*x for x in nums)) # ジェネレータ内包表記の

イテラブルなオブジェクトとしてはリストが有名です。しかし、リストと違いジェネレータは全体のシーケンスの次の要素を都度生成するため、メモリを節約することができます。

追記: 2023/11/20 ジェネレータの内部動作についてちょこっと

ジェネレータ関数がリストと比較して「重くならない」仕組みとして、内部動作で3つポイントがあります。

  1. 逐次実行: ジェネレータは要求されたときにのみ次の値を計算します。
  2. 状態の保持: ジェネレータは現在の計算状態(ローカル変数、イテレータの位置など)のみを保持します。これにより、ー度に一つの要素のみをメモリに保持します。
  3. メモリの再利用: 一度生成された値は、それがもはや必要なくなればメモリから解放されます。

こういったことにより効率的にメモリを扱うことができます。

内部実装の詳細についてはPyCon APAC 2023 Day 2 #pyconapac_5「Internals of Generators」を見ると良いでしょう。

ジェネレータを使ったコルーチン

本題のコルーチンについて説明します。

Pythonの用語集からコルーチンの意味を引用します。

(コルーチン) コルーチンはサブルーチンのより一般的な形式です。 サブルーチンには決められた地点から入り、別の決められた地点から出ます。 コルーチンには多くの様々な地点から入る、出る、再開することができます。

先ほどのジェネレータの説明ではデータの生成に焦点を当てていますが、ジェネレータはデータの受け渡しにも使うことができます。

言い換えればジェネレータはコルーチンの一種であり、 データの消費と生成の両方を行うことができます。

def coro():
    while True:
        x = yield
        print("x:", x)

c = coro()
type(c) # <class 'generator'>
next(c) # ここでコルーチンが実行される。 c.send(None) と実質同じ。
c.send(1) # x: 1
c.send(2) # x: 2
next(c) # x: None / c.send(None)と実質同じ。

上の例でもわかるようにsendで送られた値をyieldによって受け取ることができます。


ジェネレータ(コルーチン)を管理する

ここまでの実装を振り返ると、ジェネレータは

  • それ自体は単独で実行ができない(for-loopやsendなどで実行する)
  • yieldの箇所で中断し何かしらによって特定の指示がない限り再開しない

という特性があります。

このことから、何かしらの管理主体によって複数のジェネレータの実行を制御するといったことが考えられます。

例えば、ジェネレータを複数用いて並行性(複数のタスクを同時に管理)を実現する等が考えられます。

ここでは並行性を実現し複数のタスクを協調的に実行する例を示します。

import time

def gen(gen_name):
    process_times = 0
    while True:
        print(f"{gen_name}, process_times: {process_times}")
        process_times += 1
        yield
        time.sleep(1)
    
tasks = [gen("gen1"), gen("gen2"), gen("gen3")]

# 管理主体の実装
def run():
    while tasks:
        t = tasks.pop(0) # ジェネレータを取り出す
        try:
            t.send(None) # ここでジェネレータが実行される
            tasks.append(t) # 再度ジェネレータをリストに追加する
        except StopIteration:
            pass

run()

# gen1, process_times: 0
# gen2, process_times: 0
# gen3, process_times: 0
# gen1, process_times: 1
# gen2, process_times: 1
# ...

yield fromを使ってジェネレータ(コルーチン)の処理を委譲する

例えば、関数を定義したい際に、その内部で利用したい関数がジェネレータを生成する場合を考えます。

このとき、以下のような呼び方はできません。

def count_up(n):
    while 10 > n:
      yield n
      n += 1

def count_down(n):
    while 0 < n:
      yield n
      n -= 1

def func():
    count_up(1)
    count_down(10)

前述した通り、ジェネレータは単独で実行することができないためです。

この場合、2つ方法が考えられます。

1つ目の方法は「自前でジェネレータを実行するための処理を書く」です

def func():
    for i in count_up(1):
        yield i
    for i in count_down(10):
        yield i

この場合、自前でジェネレータを実行するための処理を書く必要があります。

加えて、今回の例だと単純ですが、コルーチンを複数用いた場合は実装が複雑になります。

2つ目の方法は「yield fromを使ってジェネレータを委譲する」です

yield fromは Python3.3(PEP 380 – Syntax for Delegating to a Subgenerator)から追加された構文です。

yield fromは、サブジェネレータ(今回はcount_up)が外からの処理を受け付け、その処理結果を返すための構文です。

def func():
    yield from count_up(1)
    yield from count_down(10)

for i in func():
    print(i)
# 1
# 2
# ...
# 10
# 9
# ...

他の例でyield fromを使ってみます。

ここでは、メインのジェネレータからサブジェネレータにデータを送り、サブジェネレータからの返却値をメインのジェネレータから返す例を示します。

def sub_gen():
    while 1:
        x = yield
        yield x + 1

def gen():
    yield from sub_gen()

g = gen() # サブジェネレータが指定された場合は、その返却値が代入される
next(g) # ここでサブジェネレータが実行される
g.send(1)  # gen()にデータを送ると、sub_gen()にデータが送られる
print(next(g)) # 2

上記の例では、yield fromを使うことで、サブジェネレータにデータを送り、 サブジェネレータからの返却値をメインのジェネレータから返すことを実現しています。


コルーチンとしてのasync/await

先ほどの例ではyieldを用いてコルーチンを実装しましたが、Python3.5からはasync/awaitを用いてコルーチンを実装することができます。

asyncを用いることで、コルーチンの宣言ができます。

async def hello(name):
  return f"Hello, {name}!"

h = hello("world") # <coroutine object hello at 0x102518c70> → コルーチンオブジェクトを返す
h.send(None) # StopIteration: Hello, world! → StopIterationによりコルーチンが完了したことを示す

awaitを用いることで、コルーチンの実行を待つことができます。

async def main():
    names = ["world", "python", "async"]
    for name in names:
        g = await hello(name)
        print(g)

上の2つの内容から、

  • async def で定義された関数は、非同期の操作を行うためのコルーチンオブジェクトを返す
  • await を使用して非同期の操作の完了を待ち、その後の処理を再開する

ということがわかります。

async/awaitの非同期IOへの応用

本記事と内容が逸れますが、async/awaitは主に非同期IOを実現するために広く使われています。

例えば、複数のコルーチンの制御を想定した際、「待機」状態(非同期IO処理など)に、他のタスクやコルーチンが実行することができます。

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def goodbye():
    print("Goodbye")
    await asyncio.sleep(2)
    print("Everyone")

async def main():
    await asyncio.gather(hello(), goodbye())

if __name__ == '__main__':
    asyncio.run(main())

# Hello
# Goodbye
# World
# Everyone

ここでは、asyncio.gatherによって複数のコルーチンを同時に、実行し指定されたすべてのコルーチンが完了するのを待ちます。

  • 上記の処理を実行中、asyncioモジュール内部にあるイベントループは、複数のコルーチンを管理するためのキューを持ちます。
  • このキューには、実行待ちのタスクが順番に格納されています。
  • イベントループは、キューからタスクを一つずつ取り出して実行します。
  • コルーチンがawaitを使用して非同期操作を行うと、そのコルーチンは一時的に中断されます。
    • 例えば、1秒や2秒の待機が指示された場合、その期間中にコルーチンは中断されます。
  • この中断中に、イベントループは他の待機中のコルーチンやタスクを実行します。
  • 待機が終了した後、中断されていたコルーチンは再開され、その後の処理が続行されます。

async/awaitのユースケースについては「Python multiprocessing vs threading vs asyncio」を参考にしてください。


参考

Discussion