🧘

Pythonの非同期プログラミングを完全理解

2023/02/13に公開

非同期プログラミング

非同期プログラミングについて、聞いたことのある人は多いと思います。例えば、フロントエンドで使われているJavaScriptはシングルスレッド言語で、メインスレッドをブロッキングさせないため、様々な関数は非同期処理になるよう実装されています。Node.jsもその性質を受け継ぎ、I/Oバウンドタスクに長けています。しかし、Pythonになると、並列・並行処理に対応しているため、ほとんどの人は自身のプロジェクトで非同期プログラミングを利用した経験がないでしょう。もちろん、TornadoTwistedGeventなどの非同期フレームワークが有名で使ったことのある人は少なくないですが、変わったエラーにぶつかった時はなかなか解決できないでしょう。

近年のPyConの傾向から見れば分かりますが、非同期プログラミングは間違いなくPythonエコシステムの次のトレンドになります。また、新興のプログラミング言語であるGo、Rustなどは非同期処理と高性能並行処理を売りにしています。Pythonも負けてはならないので、2013年からPythonの生みの親であるGuidoさん自らTulip(asyncio)プロジェクトの開発を行い始めました。

1. 非同期処理とは

まず、関連するコンセプトを説明してから、非同期処理の説明に入りたいと思います。

1-1. ブロッキング

  • プログラムは計算リソースを得られずハングアップになった状態
  • プログラムはある処理が完成するのを待っている時、自身は他の処理を行えない状態をブロッキングという
  • よく見られるブロッキングの形式:ネットワークI/Oブロッキング、ハードディスクI/Oブロッキング、ユーザー入力ブロッキング

ブロッキングはあらゆるところに存在します。例えば、CPUが「コンテキストスイッチ」をしている時、他のプロセスは処理を行えないため、ブロッキングされます。(マルチコアの場合は、コンテキストスイッチをしているコアが利用不可になる)

1-2. ノンブロッキング

  • プログラムはある処理を待つ時、自身はブロッキングされず、他の処理に移れる状態をノンブロッキングという
  • ノンブロッキングは全てのプログラムレベルと状況で可能なものではない
  • プログラムのカプセル化レベルにおいて、子のプログラムユニットを許容できる時(「不可分処理」ではない時)のみ、ノンブロッキング状態が存在しうる

ノンブロッキングはブロッキングの反対面で、ある処理のブロッキングが計算効率を低下させたため、その処理をノンブロッキングにするわけです。

1-3. 同期処理

  • 異なるプログラムユニットはあるタスクを完成するため、実行過程において何らかの通信方式で一貫性を保持している時、これらのプログラムユニットを同期処理という
  • 例えば、ECサイトのDBで商品在庫の更新において、「行ロック」を信号として、それぞれの更新リクエストに強制的に順序付ける時、この在庫更新の処理を同期処理と言える
  • すなわち、同期処理は順番のある処理と言える

1-4. 非同期処理

  • 異なるプログラムユニットはお互い通信しなくても、あるタスクを処理できる方式
  • 関連性のないプログラムユニット間は非同期である
  • 例えば、Webページをスクレイピングする場合、スケジューリングプログラムがダウンロード用のプログラムを呼び出したら、そのダウンロードタスクとの通信を保持する必要なく、すぐ他のタスク用のプログラムを呼び出すことができます。また、異なるWebページのダウンロードなどの処理はお互い関連性がないため、通知し合う必要もありません。ただし、これらの非同期処理は完成時間が未知です。
  • すなわち、非同期処理は順番のない処理と言える

上記の通信方式というのは非同期処理と並行処理においての「synchronization primitive」を指します。例えば、セマフォ、ロック、同期キューなどがあります。これらの通信方式は、複数のプログラムをある条件の下で同期処理を行うためのものです。そして、非同期処理があるからこそ、これらの通信方式が必要になるわけです。理由として、全てのプログラムが同期処理を行うのなら、もともと順番に処理されるため、通信方式なんかは必要ないからです。

1-5. 並行処理

  • 並行処理はプログラムの構造で、プログラムを複数の独立で実行できる子タスクとして設計することを指す
  • 有限の計算リソースを利用し、マルチタスクを同時処理、または近似的に同時処理を行うのが目的

1-6. 並列処理

  • 並列処理はプログラムの実行状態で、マルチタスクが同時処理されることを指す
  • 余分の計算リソース(マルチコアCPU)を利用し、マルチタスクの実行を加速させるのが目的

並列・並行処理については、前回の記事を参照してください。

1-7. コンセプトのまとめ

  • 並列処理はマルチコアCPUを利用して、マルチタスクを加速させるためのもの
  • 並行処理は独立した子タスクをできるだけ速く実行するためのもので、全体の処理を加速できるとは限らない
  • ノンブロッキングはプログラムの実行効率を向上させるためのもの
  • 非同期処理は効率よくノンブロッキングタスクを管理するためのもの

並行処理をサポートするためにプログラムをマルチタスクに分割する必要があります。それぞれのタスクに対して、ブロッキング・ノンブロッキング、同期・非同期が定義されます。そのため、並行、非同期、ノンブロッキングの三者は密接した関係性を持っています。

1-8. 非同期プログラミング

同期・非同期、ブロッキング・ノンブロッキングは相容れないものではありません。例えば、ECサイトはマルチユーザーのアクセスリクエストに対して非同期処理を行うが、在庫の更新は同期処理である必要があります。

1-9. 非同期プログラミングの難点

  • コントロールが効かない。理由は、実行順序は予測不可能だから。並行処理を加えるとさらに難しくなる

そのため、非同期フレームワークのほとんどは非同期プログラミングモデルをシンプル化(一度に1つのイベントしか処理を許容しない)しています。非同期プログラミングについての議論はシングルスレッドのものに集中しています。

  • あるイベントを処理するプログラムは長い計算時間を必要とするとき、他の部分は全部ブロッキングされる

そのため、非同期プログラミングを採用したら、各非同期呼び出しを小さくしないとだめです。ここの「小さく」というのは計算時間を短くすることです。そして、プログラムをどのように非同期タスクに分割するのが難題になります。

  • プログラムの次のステップは前のステップの結果に依存することが多い。非同期呼び出しの時、その結果または完成しているのかを知る方法が必要
  • コールバックは必然になるが、「コールバック地獄 (callbackhell)」に陥って苦痛
  • 同期プログラミングのコードを非同期プログラミングに変更すると、コードの構造は必然的にぶっ壊れる
  • 問題を解決するためのシンキングも変えなければならない。ただただ突っ走るのではなく、細心の注意を払って非同期タスクを配置する必要がある

2. 非同期プログラミングを採用する理由

前述のように、非同期プログラミングにはたくさんの難点があります。Pythonの生みの親が4年もかけて自ら取り組んで作った「asyncio」はPython 3.6で標準ライブラリーになったが、なんであんな苦労してまで作らなければならないでしょうか?理由は、非同期プログラミングは非常に役に立つものだからです。

2-1. CPUの時間感覚

CPUのクロック数を2.6Ghzと仮定し,つまり1秒あたり2.6\times10^9の命令処理が可能で、命令処理ごとにかかる時間は0.38nsになります。0.38nsがCPUの時間感覚において最小単位になります。そして、人間の時間感覚の最小単位を1sとします。下の表は、人間の時間感覚をベースにCPUの時間感覚を計算したものになります。また、CPUの遅延データは「Latency numbers every programmer should know」を参照しています。

処理 実際の遅延 CPUの時間感覚
命令処理 0.38ns 1s
L1キャッシュの読み込み 0.5ns 1.3s
分岐予測修正 5ns 13s
L2キャッシュの読み込み 7ns 18.2s
排他制御 25ns 1m5s
メモリー参照 100ns 4m20s
コンテキストスイッチ 1.5µs 1h5m
1Gbpsのネットワークで2KBのデータをアップロード 20µs 14.4h
メモリから1MBの連続データを読み込む 250µs 7.5day
同じインターネットデータセンターのホストにpingする(往復) 0.5ms 15day
SSDから1MBの連続データを読み込む 1ms 1month
ハードディスクから1MBの連続データを読み込む 20ms 20month
違う都道府県のホストにpingする(往復) 150ms 12.5year
バーチャルマシンの再起動 4s 300year
サーバーの再起動 5m 25000year

CPUはコンピューターの処理コアで、貴重なリソースになります。CPUの実行時間を無駄遣いし、利用率を低下させると、プログラムの効率も必然的に低下します。上記の表が示したように、1Gbpsのネットワークで2KBのデータをアップロードしたら、CPUの感覚では14時間を過ごしたようなものです。もし、10Mbpsのネットワークとなると、更に100倍も効率が下がります。この長い時間を、CPUをただただ待たせて他の処理に移さない行為はまさにCPUの「青春」の無駄遣いになります。

2-2. 現実の問題点

  • コスト

プログラムはコンピューターの計算リソースを有効に利用できないと、その穴を埋めるために、より多くのコンピューターが必要になってきます。例えば、スクレイピングのプログラムを非同期プログラミングで設計し直すと、もともと必要とした7台のサーバーを3台まで減らせて、コストを57%削減することができます。ちなみに、AWSではm4.xlargeのリザーブドインスタンスは一年あたり15万円ぐらいかかります。

  • 効率

お金はどうでも良いなら、効率はどうしても気になるでしょう。サーバーの数を一定量まで増やしたら、アーキテクチャーやプログラムのデザインを改善しないと、更に増やしても性能が上がらないことがあります。そして、管理コストが圧倒的に増えることになります。

  • C10K/C10M問題

「C10K問題」は1999年に提出されたもので、どのようにすれば、1GhzのCPU、2Gメモリと1Gbpsのネットワーク環境で、シングルサーバーで1万のクライアントに同時にFTPサービスを提供することができるだろうというチャレンジみたいなものです。2010年から、ハードウェアの性能の向上により、C10Kに続き、「C10M問題」が提出されました。C10M問題は、8コアのCPU、64Gメモリと10Gbpsのネットワーク環境で、1秒あたり100万の同時アクセスを処理する問題です。

コストと効率は企業経営の角度からの問題で、C10K/C10M問題はハードウェアに対する技術的なチャレンジになります。C10K/C10M問題が解決できたら、コストと効率の問題も同時に解決されることになります。

2-3. 解決方法

CPUは計算が非常に速いが、コンテキストスイッチ、メモリ読み込み、ハードディスク読み込み、ネットワーク通信が非常に遅いです。つまり、CPUから離れると、L1キャッシュ以外は全部遅いです。コンピューターは5大装置の入力装置、出力装置、記憶装置、制御装置、演算装置からなりますが、制御装置と演算装置はCPUの中にありますが、それ以外は全部I/Oです。メモリの読み書き、ハードディスクの読み書き、ネットワークインタフェースカードに対する読み書きは全てI/Oになります。I/Oは最大なボトルネックになっています。

非同期プログラムは効率を上げることができますが、最大なボトルネックはI/Oなので、それに対するソリューションは「非同期I/O」と呼びます。

3. 非同期I/Oの進化の道

この地球上、一番規模の大きいプログラムはインターネットでしょう。そして、CPUの時間感覚の表から、ネットワークI/OはハードディスクI/Oよりも遅く、最大なボトルネックになっていることが分かります。サーバーの再起動などを除き、ネットワークI/Oより遅いものはないため、様々な非同期フレームワークはネットワークI/Oを狙っています。

スクレイピングを例として説明します。ここでは、ネットから10個のWebページをダウンロードします。

3-1. 同期ブロッキング方式

一番簡単なのは順番にダウンロードする方式です。socketの接続の確立からリクエスト送信、レスポンス受信という順番で実行されます。

import socket
import time


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def sync_way():
    res = []
    for i in range(10):
        res.append(blocking_way())
    return len(res)


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        sync_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

実行結果:

elapsed_time: 2.76[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.56[sec]
elapsed_time: 2.58[sec]
elapsed_time: 2.85[sec]
elapsed_time: 2.66[sec]
elapsed_time: 2.60[sec]
elapsed_time: 3.38[sec]
elapsed_time: 2.88[sec]
elapsed_time: 2.67[sec]
mean_elapsed_time: 2.75[sec]

10回の平均時間は2.75秒です。blocking_way()関数はsocket接続を確立し、HTTPリクエストを送り、socketからHTTPレスポンスを読み取り、データを返します。そして、sync_way()関数はそれを10回繰り返しただけです。上記のコードでは、sock.connect(('example.com', 80))でサーバーの80番に対してリクエストを送り、sock.recv(4096)socketから4KBずつバイトデータを読み取ります。

ネットワークの接続がいつ確立されるのはクライアント側が決めることではなく、ネットワーク環境やサーバーの処理能力によって決まるものです。そして、サーバーからいつデータが返ってくるのも予測できないです。そのため、デフォルトではsock.connect()sock.recv()はブロッキングします。一方で、sock.send()は長くブロッキングはしません。sock.send()はリクエストデータをTCP/IPプロトコルスタックのバッファにコピーしたらすぐ戻り値を返すため、サーバーからのレスポンスを待たないです。

もし、ネットワーク環境が非常に悪くて、接続の確立に1秒がかかるとしたら、sock.connect()は1秒ブロッキングします。この1秒は2.6GHzのCPUにとって、83年のような時間感覚になります。この83年間、CPUは何もできません。sock.recv()も同じく、サーバーからのレスポンスをクライアントが受信するまで待たなければなりません。example.comのホームページを10回ダウンロードしたら、このブロッキングは10回繰り返します。では、一日1000万のWebページをダウンロードする大規模なスクレイピングはどうなるでしょう。

まとめると、同期ブロッキング方式のようなネットワークI/Oは、効率が非常に低くて、頻繁に通信するプログラムの中ではなおさらです。このような方法は、C10K/C10Mを解決できるわけがありません。

3-2. 同期ブロッキング方式の改良:マルチプロセス方式

同じプログラムで10回実行するのは時間がかかるなら、10個同じプログラムを同時に実行させれば良いでしょう。そこで、マルチプロセスを導入します。ちなみに、Linux 2.4以前のOSにおいては、プロセスこそがタスクのエンティティであり、OSはプロセス指向にデザインされたのです。

import socket
import time
from concurrent.futures import ProcessPoolExecutor


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def multi_process_way():
    with ProcessPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(blocking_way) for i in range(10)}
    return len([future.result() for future in futures])


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        multi_process_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

実行結果:

elapsed_time: 0.49[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.48[sec]
elapsed_time: 0.49[sec]
elapsed_time: 0.54[sec]
elapsed_time: 0.51[sec]
elapsed_time: 0.56[sec]
elapsed_time: 0.52[sec]
elapsed_time: 0.47[sec]
elapsed_time: 0.47[sec]
mean_elapsed_time: 0.50[sec]

10回の平均時間は0.50秒です。効果がありましたね。ただし、同期ブロッキング方式の10分の1になってないのが問題です。その理由は実行環境のCPUは10コアではなく、プロセスの切り替えが必要だからです。

プロセスの切り替えはCPUの時間感覚の「コンテキストスイッチ」項目で示したように安くない処理です。CPUは1つのプロセスからもう1つのプロセスに切り替える時、まず、元のプロセスランタイムのレジスタ状態、メモリ状態を全部保存し、それからもう1つのプロセスの保存した状態を復元します。CPUにとっては、数時間待たされたようなものです。プロセス数がCPUのコア数より多い時は、プロセスの切り替えが必ず必要になります。

切り替え以外に、マルチプロセスにはもう1つの欠点があります。普通のサーバーは安定した状態で作動するため、同時処理できるプロセスの数は数十から数百規模に留まっています。プロセス数があまりにも多いと、システムが不安定になり、メモリリソースも足りなくなります。

そして、マルチプロセスは切り替えや規模の小さ以外にも、状態やデータの共有などの問題があります。

3-3. 同期ブロッキング方式の更なる改良:マルチスレッド方式

スレッドのデータ構造はプロセスより軽量で、プロセス内に複数のスレッドを持つことができます。Linux 2.4より新しいOSも、スケジューリングできる最小単位をプロセスからスレッドに変更しました。プロセスはただスレッドの容器として存在し、リソースの管理という役割を果たすようになりました。OSレベルのスレッドはCPUのそれぞれのコアに分配され、同時実行できるものです。

import socket
import time
from concurrent.futures import ThreadPoolExecutor


def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('example.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def multi_thread_way():
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(blocking_way) for i in range(10)}
    return len([future.result() for future in futures])


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        multi_thread_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

実行結果:

elapsed_time: 0.31[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.31[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.27[sec]
mean_elapsed_time: 0.30[sec]

10回の平均時間は0.30秒です。予想通り、マルチプロセスより速かったですね。スレッドにもコンテキストスイッチは存在しますが、プロセスと比べると大分軽いものになっています。これで、マルチスレッドはマルチプロセスのプロセス切り替えが遅いという問題を解決したようで、しかも同時処理できるタスクの規模もマルチプロセスの数百個から数千個に増加しました。

しかし、マルチスレッドにも問題があります。まず、Pythonのマルチスレッドは「GIL」の存在で、マルチコアCPUのアドバンテージを利用できないという問題があります。1つのPythonプロセス内で、ある時刻において1つのスレッドだけのアクティブ状態が許容されます。なら、なぜマルチスレッドはマルチプロセスよりも速かったでしょう。

理由はsock.connect()sock.recv()のようなブロッキングするシステムコールを呼び出す時、現在のスレッドはGILを解放し、他のスレッドに実行できるチャンスを与えるからです。ただし、シングルスレッドの中でしたら、ブロッキングするシステムコールはそのままブロッキングします。

豆知識:
Pythonのtime.sleepはブロッキングする処理ですが、マルチスレッドプログラミングでしたら、time.sleep()は他のスレッドをブロッキングしません。

GIL以外に、マルチスレッドの共通した問題点もあります。スレッドはOSにスケジューリングされ、その「スケジューリングの戦略(scheduling strategy)」は「プリエンプション」で、同じ優先度のスレッドは均等的なチャンスで実行されるのが保証されます。プリエンプションは先取りの戦略なので、次の時刻にどのスレッドが実行されるのか、どのコードが実行されるのかは予測できず、「競合状態」になることがあります。

例えば、スクレイピングのワーカースレッドはタスクキューから次にスクレイピングするURLをpollする時、もし複数のスレッドが同時にpollしにきたら、どれに渡すかが問題になります。そこで、同じタスクが複数回実行されないようにロックや同期キューが必要になってきます。

また、マルチスレッドは数百から数千のマルチタスクを同時処理できますが、大規模かつ高頻度のウェブシステムに対してはまだ力不足です。もちろん、マルチスレッドの最大な問題点はやはり競合状態です。

3-4. 非同期ノンブロッキング方式

ようやく、ノンブロッキング方式まで来ました。まずは、一番原始的なノンブロッキングはどう作動するのかを見てみましょう。

import socket
import time


def nonblocking_way():
    sock = socket.socket()
    sock.setblocking(False)
    # socketはノンブロッキング接続リクエストを送信すると、OSはエラーを起こすため
    try:
        sock.connect(('example.com', 80))
    except BlockingIOError:
        pass
    request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    data = request.encode('ascii')
    # socketの接続がいつ確立されるか予測できないため、送信を繰り返す
    while 1:
        try:
            sock.send(data)
            break
        except OSError:
            pass

    response = b''
    # レスポンスがいつ読み取れるか予測できないため、受信を繰り返す
    while 1:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                # blocking
                chunk = sock.recv(4096)
            break
        except OSError:
            pass
    return response


def sync_way():
    res = []
    for i in range(10):
        res.append(nonblocking_way())
    return len(res)


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        sync_way()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

実行結果:

elapsed_time: 2.71[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.73[sec]
elapsed_time: 2.69[sec]
elapsed_time: 2.82[sec]
elapsed_time: 2.72[sec]
elapsed_time: 2.51[sec]
elapsed_time: 2.65[sec]
elapsed_time: 2.75[sec]
elapsed_time: 3.50[sec]
mean_elapsed_time: 2.79[sec]

10回の平均時間は2.79秒です。騙された気分ですね。同期ブロッキングと同じレベルの計算時間なのに、コードが余計に複雑になっています。ノンブロッキングなんかいらないと思うかもしれません。

まず、上記のコードでは、sock.setblocking(False)で、OSにsocketのブロッキングするシステムコールをノンブロッキングに変更するよう指示を出しました。前述のように、ノンブロッキングは1つのことをやるときに、自分を呼び出したプログラムが他のことをやるのを邪魔しないことです。上記のコードは、sock.connect()sock.recv()は実行後、確かにブロッキングしなくなりました。

そして、コードが複雑になったのもブロッキングしなくなったからです。connect()が呼び出されると、OSはまずエラーを起こすため、ここでtryでキャッチする必要があります。そして、ここではブロッキングせずに、すぐに下のコードへ移ります。

while文で繰り返し、send()を実行するのは、connect()がノンブロッキングになり、いつ接続が確立されるのか分からないため、試し続けるしかないからです。そして、send()が実行されても、レスポンスがいつ来るか分からないため、recv()の呼び出しも繰り返して実行しています。

connect()recv()はメインプログラムをブロッキングしなくなったが、CPUのあまった時間は有効に利用できてないです。その時間はただwhileループでsocketの読み書きの繰り返しやエラー処理などに費やしていました。

そして、10個のダウンロードは依然のまま順番に実行されるため、トータルの計算時間は同期ブロッキング方式と変わらないわけです。

3-5. 非同期ノンブロッキン方式の改良

3-5-1. epoll

もしOS側はノンブロッキングの各呼び出しの用意ができているのかのチェックをしてくれるなら、アプリケーション側でループで待ったり判断したりしなくて済むので、空いた時間を他の処理に回すことで効率をアップできますね。

そのため、OSはI/O状態の変更をイベントとしてカプセル化してくれました。例えば、読み取り可能イベント、書き込み可能イベントなどがあります。そして、アプリケーションがイベント通知を受け取れるようにシステムモジュールも提供してくれました。そのモジュールはselectです。アプリケーションはselectを通して、「ファイル記述子」やコールバック関数を登録できます。ファイル記述子の状態が変化したら、selectは事前に登録されたコールバック関数を呼び出します。このやり方をI/O多重化(I/O multiplexing)と言います。

selectはアルゴリズムの効率が悪いため、のちにpollとして改良されました。さらに、BSDカーネルはkqueueモジュールに、Linuxカーネルはepollモジュールに改良しました。この4つのモジュールの機能は同じで、利用できるAPIもほぼ一緒です。kqueueepollは大量なファイル記述子を処理する時の効率が他の2つのモジュールより良いという違いがあります。

Linuxサーバーが普及しているため、epollモジュールはよく耳にしますね。ファイル記述子の数をNとして、select・pollO(N)の時間計算量で処理するのに対して、epollO(1)で処理できます。また、epollはリスニングする全てのファイル記述子を、1個の特殊なファイル記述子でマウントします。このファイル記述子はマルチプロセス・スレッドで共有できるものです。

3-5-2. コールバック

I/OイベントのリスニングをOSに任せました。I/O状態が変化した時(例えばsocket接続が確立されデータを送信できるようになった時)、OSは次に何をすれば良いでしょう。それはコールバックです。

ここでは、データの送信と読み込みをそれぞれ独立した関数にカプセル化する必要があります。epollはアプリケーションの代わりにsocket状態をリスニングする時に、epollに「socket状態が書き込み可能な状態(接続が確立された)になったら、HTTPリクエスト送信関数を呼び出してください。socket状態が読み取り可能な状態(クライアントがレスポンスを受信した)になったら、レスポンス処理関数を呼び出してください」と教える必要があります。

epollとコールバックを利用して、スクレイピングのコードをリファクタリングします。

import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ


class Crawler:
    def __init__(self, path):
        self.path = path
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('example.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

    def connected(self, key, mask):
        selector.unregister(key.fd)
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        self.sock.send(get.encode('ascii'))
        selector.register(key.fd, EVENT_READ, self.read_response)

    def read_response(self, key, mask):
        global stopped
        chunk = self.sock.recv(4096)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            paths_todo.remove(self.path)
            if not paths_todo:
                stopped = True

前と違うのは、同じページではなく10個の違うページをダウンロードするところです。ダウンロードするページパスの集合paths_todoは後で定義します。ここでは、改善したところを見てみましょう。

まず、send()recv()の2つのループが消えました。

続いて、selectorsモジュールを導入し、DefaultSelectorインスタンスを作成しました。Python標準ライブラリーのselectorsselect/poll/epoll/kqueueをカプセル化したものです。DefaultSelectorはOSによって一番良いモジュールを選んでくれます。Linux 2.5.44以上のバージョンは全部epollになっています。

そして、socketの書き込み可能イベントEVENT_WRITEと読み取り可能イベントEVENT_READが発生した後に処理するコールバック関数も登録しました。

コードの構造が綺麗になり、ブロッキングの通知などもOSに任せました。しかし、10個違うページをダウンロードするには、10個のCrawlerインスタンスが必要になり、20個のイベントが発生します。どうやってselectorから今発生したイベントを取得して、対応するコールバックを実行するでしょう。

3-5-3. イベントループ

上記の問題を解決するには、古いやり方を採用するしかないです。つまり、ループです。selectorモジュールにアクセスし、どのイベントが発生したか、どのコールバックを呼び出すべきかを知らせてくれるまで待ちます。この待ちのイベント通知ループをイベントループ(event loop)と言います。

def loop():
    while not stopped:
        # 何らかのイベントが発生するまでブロッキングする
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)

上記のコードで、stoppedというグローバル変数でイベントループがいつ止まるかをコントロールします。paths_todoが全部消費されたら、stoppedTrueに変更します。

そして、selector.select()はブロッキング呼び出しです。ここでイベントが発生しないと、アプリケーションは処理できるものがないため、イベントが発生するまでブロッキングする必要があります。予想できますが、1つのWebページだけダウンロードするとき、connect()の後に、send()そしてrecv()ができるようになるため、同期ブロッキング方式と処理の効率は同じです。理由はconnect()recv()でブロッキングしなくても、select()ではブロッキングするからです。

なので、selector機能(ここからはepoll/kqueueと呼ぶ)は大規模な並行アクセスを解決するためにデザインされたものです。システムの中に大量なノンブロッキング呼び出しがあり、ほぼランダムにイベントを生成できる状態でこそ、selector機能は最大な威力を発揮できます。

下のコードは10個のダウンロードタスクを作成し、イベントループを起動したものです。

if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        selector = DefaultSelector()
        stopped = False
        paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
        start = time.time()
        for path in paths_todo:
            crawler = Crawler(path)
            crawler.fetch()
        loop()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

実行結果:

elapsed_time: 0.29[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.24[sec]
elapsed_time: 0.27[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.32[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.30[sec]
elapsed_time: 0.34[sec]
mean_elapsed_time: 0.29[sec]

10回の平均時間は0.29秒です。強いですね。シングルスレッドの中で、「イベントループ+コールバック」で、10個のページの同時ダウンロードという問題を解決しました。これこそ、非同期プログラミングです。forループがあり、順番的にCrawlerインスタンスを作成し、fetchメソッドを呼び出ししているように見えますが、fetchメソッドの処理はconnect()とイベントの登録だけです。そして、実行時間から見れば、明らかにダウンロードタスクが同時に行われました。

上記コードの非同期処理の手順:

  1. Crawlerインスタンスの作成
  2. fetchメソッドを呼び出し、socketの接続を作りとselectorに書き込み可能イベントを登録する
  3. fetchにはブロッキング処理がないため、すぐに返す
  4. 上記のステップを10回繰り返し、10個のダウンロードタスクを全部イベントループに追加
  5. イベントループを起動し、最初のループに入り、イベントのリスニングにブロッキングする
  6. あるダウンロードタスクのEVENT_WRITEが発生したら、そのconnectedメソッドをコールバックし、最初のループを終了する
  7. 2ラウンド目のループに入り、あるダウンロードタスクにイベントが発生したら、そのコールバック関数を実行する;この時点でどのイベントが発生するかは推測不可能になり、前回のconnectedの中のEVENT_READが発生するかもしれないし、他のタスクのEVENT_WRITEが発生する可能性もあります(この時、1つのタスクにブロッキングした時間は他のタスクの実行に利用される)
  8. 全てのダウンロードタスクが実行されるまでループが続く
  9. ループを終了し、プログラム全体を終了する

3-5-4. まとめ

同期ブロッキング方式から非同期ノンブロッキング方式まで見てきました。そして、シングルスレッドの中で、複数のネットワークI/Oブロッキング型タスクを並行処理する黒魔術もできるようになりました。マルチスレッドと比べると、スレッドの切り替えすらなくなりました。コールバックの実行は関数呼び出しで、スレッドのスタック内で完結します。しかも、性能も優れています。シングルサーバーで同時処理できるタスク数も数万から数十万といった規模になりました。

一部のプログラミング言語は、非同期プログラミングに対するサポートはここまでです。エンジニアは直接epollを使って、イベントやコールバックの登録、イベントループの管理、そして、コールバックの設計に多くの時間を費やす必要があります。

ここまでの内容を見ると分かると思いますが、どの言語を使っても、非同期プログラミングをするなら、上記の「イベントループ+コールバック」パターンから逃げられないです。ただし、epollを使ってないかもしれないし、whileループではない可能性もあります。しかし、どれも「後で教えるよ」というモデルの非同期方式になります。

では、どうして一部の非同期プログラミングでは、コールバックパターンが見られてないでしょうか。これについてはこれから見ていきます。Pythonの非同期プログラミングの重役である、コルーチンについてはまだ何も話していないからですね。

4. Pythonの非同期I/Oに対する改良の道

ここからはPythonの非同期プログラミングのエコシステムはどうやって前述の「イベントループ+コールバック」パターンを受け継いだのかを説明し、そこからどのようにasyncioというネイティブコルーチンパターンに進化したのかを見ていきましょう。

4-1. コールバック地獄

3. 非同期I/Oの進化の道で、シングルスレッドで非同期プログラミングを実現した「イベントループ+コールバック」の基本構造について見てきました。確かに、「イベントループ+コールバック」は大きくプログラムの効率を上げることができます。しかし、まだまだ問題解決ではないです。実際のプロジェクトは非常に複雑になるので、以下の問題を考えなくてはならないです。

  • コールバックが正常に実行されなかった場合どうする
  • どのようにコールバックの中にコールバックを組み込み、多重構造にする
  • 多重構造にしたら、ある層でエラーが起きたらどうなる
  • あるデータを全てのコールバックに処理させる必要がある時はどうする
  • ...

実際のプロジェクトでは、これらの問題は避けられないです。そして、問題の背後にコールバックパターンの欠点が隠れています。

  • 多重コールバックにすると、可読性が著しく低下する
def callback_1():
    # 処理
    def callback_2():
        # 処理
        def callback_3():
            # 処理
            def callback_4():
                # 処理
                def callback_5():
                    #処理
                async_function(callback_5)
            async_function(callback_4)
        async_function(callback_3)
    async_function(callback_2)
async_function(callback_1)

(Lisp信者はご容赦ください)

  • コードの構造をぶっ壊すことになる

同期プログラミングのコードを書く時、関連のある処理は普通に上から下へと実行されます。

do_a()
do_b()

もし、do_b()do_a()の結果に依存し、そしてdo_a()は非同期呼び出しの場合、do_a()の結果がいつ返ってくるか分からないため、後続の処理をコールバックの形式でdo_a()に渡す必要があります。こうすることで、do_a()が完了してからdo_b()の実行に入ることを保証できます。

do_a(do_b())

そして、長い処理フローを全部非同期処理にすると、こうなります。

do_a(do_b(do_c(do_d(do_e(do_f(...))))))

上記のスタイルは「コールバック地獄スタイル」と言います。しかし、主な問題点は見た目ではなく、本来の上から下への構造を外から内へと変更する必要があるところです。まずはdo_a()、そしてdo_b()、続いてdo_c()、…、一番内側にあるdo_f()へと実行していくことになります。同期処理で、do_a()の後にdo_b()というのは、スレッドの命令ポインタがフローをコントロールしてくれます。しかし、コールバックパターンになると、フローのコントロールはエンジニアが注意しながら配置することになります。

  • 状態の共有や管理が難しい

3-1の同期バージョンsockオブジェクトは上から下へとずっと使い回されるのに対して、3-5のコールバックバージョンCrawlerクラスをインスタンス化して、sockオブジェクトをselfに保存する必要があります。オブジェクト指向的なプログラミングスタイルを採用しないと、共有する必要のある状態をバトンタッチみたいにそれぞれのコールバック関数に渡す必要が出てきます。そして、複数の非同期呼び出しの間に、どの状態を共有しなればならないかは事前に計画して、入念にデザインする必要があります。

  • エラー処理が難しい

一連のコールバックは呼び出しのチェーン(「メソッドチェーン」)を構成します。例えば、do_a()からdo_f()というようなチェーンがあります。もし、do_d()がエラーを起こしたらどうなると思いますか?チェーンが切れて、バトンタッチしてた状態も失います。そして、「スタックトレース」をぶっ壊すことになります。例えば、do_d()がエラーを起こし、do_c()の内部でdo_d()の呼び出しがエラーになったため、do_c()自体もエラーを起こすことになります。同様にdo_b()do_a()もエラーを起こすことになり、結局エラーログは「do_a()の呼び出しがエラーを起こした」とだけ報告することになります。しかし、実際エラーを起こしたのはdo_d()です。これを防ぐために、エラーを全部キャッチして、関数の戻り値としてデータを返させる必要があります。そして、全てのコールバック関数は前の関数の戻り値をチェックする必要があり、「エラーの隠蔽」を防ぎます。

というわけで、可読性はただ見栄えの問題だとしても、スタックトレースをぶっ壊すのと状態の共有や管理が難しいといった2つの欠点はコールバックベースの非同期プログラミングの異常な難しさの原因になります。あらゆるプログラミング言語はこの問題を解決しようとしています。そのおかげで、「Promise」「コルーチン」といったソリューションが生まれたわけです。

4-2. 課題

「非同期タスクがいつ完了するか」、「非同期呼び出しの戻り値をどう処理するか」といった非同期プログラミングの難点について、「イベントループ+コールバック」パターンで解決できました。しかし、コールバックはプログラムを複雑にしてしまいます。この欠点を回避する手段を考える前に、まず本質的なところをはっきりする必要があります。なぜ、コールバックは必須なのでしょうか?そして、そもそも、欠点の1つとされる状態の共有や管理は何のためでしょうか?

状態の共有や管理が必要なのは、プログラムは自分は何をした、今何している、これから何をするというのを知る必要があるからです。言い換えると、プログラムは自身の現在の状態を把握する必要があり、その状態をそれぞれのコールバックにバトンタッチして、保持し続ける必要もあります。

複数のコールバック間の状態の管理が難しいです。ならば、それぞれのコールバックに自分の状態だけを管理させるのはどうでしょう?呼び出しのチェーンは、エラーハンドルを難しくします。ならば、呼び出しのチェーンを使わないのはどうでしょう?しかし、呼び出しのチェーンを使わないなら、呼び出された関数は前の関数が完了しているかどうかをどう知るのでしょうか?では、コールバックに次のコールバックに通知させるのはどうでしょう?そもそも、コールバックというのは、保留中(pending)のタスクという見方もできますね。

タスク間の相互通知、タスク毎に自分の状態を持たせる、これはまさに古から存在したプログラミング作法「協調的マルチタスク」です。しかし、シングルスレッドの中にスケジューリングする必要があります。ここからは、スタックフレームを持ち、自身の状態を簡単に知ることができる「コルーチン」の出番です。コールチン間はもちろん通知し合うことも可能です。

4-3. コルーチン

コルーチンは「サブルーチン」を一般化したものとされています。コルーチンのスケジューリング戦略は「ノンプリエンプティブ」で、複数のエントリーで、処理の中断と再開をコントロールできます。

サブルーチンはプログラミング言語が定義した呼び出し可能なコードブロックです。言い換えると、ある機能を実現するためにパックされた一連の命令ですね。一般的なプログラミング言語では、関数やメソッドといった構造でサブルーチンを実現しています。

4-4. ジェネレーターベースのコルーチン

Pythonの中に特殊なオブジェクト「ジェネレーター」があります。ジェネレーターの特徴はコルーチンと似ていて、イテレーションの間に、中断でき、そして次のイテレーションまで今までの状態を失うことがありません。

ジェネレーターで簡単なコルーチンを実現するため、Python 2.5でジェネレーターの機能が強化(PEP 342)されました。この機能強化の提案タイトルは「Coroutines via Enhanced Generators」です。PEP 342のおかげで、ジェネレーターはyieldで実行を中断し、データを返すことができるようになりました。そして、sendでジェネレーターにデータを送ることもでき、throwでジェネレーター内にエラーを起こさせ、終了させることも可能になりました。

次に、ジェネレーターベースのコルーチンで、スクレイピングのプログラムをリファクタリングします。

4-4-1. Futureオブジェクト

コールバック方式を辞めると、非同期呼び出しの結果をどうやって知るのでしょうか?ここでは、まずオブジェクトを定義します。非同期呼び出しの結果が返ってきたら、その中に保存します。このオブジェクトをFutureオブジェクトと呼びます。

import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ


class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

Futureオブジェクトはresultインスタンス変数を持ち、将来の実行結果を蓄積します。そして、set_resultメソッドはresultを設置するもので、resultに値をバインドしたら、Futureオブジェクトに事前に追加されたコールバック関数を実行します。コールバック関数は、add_done_callback()メソッドで追加できます。

コールバック方式を辞めると言いませんでしたっけ?慌てないでください。ここで説明したように、非同期プログラミングをするなら、「イベントループ+コールバック」パターンから逃げられないです。そして、ここのコールバックは前のものと少し違います。

4-4-2. Crawlerをリファクタリング

とにかく、将来のデータを表すFutureオブジェクトを作りました。Futureオブジェクトでスクレイピングのコードをリファクタリングしてみます。

class Crawler:
    def __init__(self, path):
        self.path = path
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('example.com', 80))
        except BlockingIOError:
            pass
        f = Future()

        def on_connected():
            f.set_result(None)

        # fileno()メソッドはソケットのファイル記述子を短い整数型で返します
        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield f
        selector.unregister(sock.fileno())
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        sock.send(get.encode('ascii'))

        global stopped
        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(), EVENT_READ, on_readable)
            # sendされたresultを受け取る
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                paths_todo.remove(self.path)
                if not paths_todo:
                    stopped = True
                break

前のコールバックバージョンと比べると、かなり違いが出ています。fetchメソッドにyield表現を使い、ジェネレーターにしました。ジェネレーターはnext()でイテレーションを1回回すかsend(None)で起動でき、yieldまで行くと中断します。では、fetchジェネレーターはどうやって再開するのでしょうか?

4-4-3. Taskオブジェクト

上記の問題を解決するために、1つのルールに従う必要があります。それは、「単一責任の原則」 です。そのため、ここでは、ジェネレーターの再開や状態の管理という役割を果すものを作ります。名前はTaskにします。

class Task:
    def __init__(self, coro):
        # コルーチンオブジェクト
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            # sendすると、ジェネレーターは次のyieldまで実行される
            # next_futureはyieldが返したオブジェクト
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)

上記コードのTaskオブジェクトはcoroコルーチンオブジェクトをパックします。管理するタスクは、保留中(pending)のタスク(コルーチン)のため、ここのcorofetchジェネレーターになるわけです。そして、stepメソッドがあり、初期化の時にまず1回実行されます。stepメソッドはジェネレーターのsend()メソッドを呼び出し、初期化の時はsend(None)になるため、coroすなわちfetch()の初回イテレーションが回されます。

send()完了後、次のfutureが得られて、add_done_callbackを使って次のfuturestep()というコールバック関数を追加します。

次に、fetch()ジェネレーターを見てみましょう。リクエストの送信やレスポンスの受信などのビジネスロジックは内部で完結しています。そして、selectorに登録するコールバック関数もシンプルになりました。2つのyieldは対応するfutureを返し、Task.step()内で受け取ります。これで、TaskFutureCoroutineをうまく繋ぎ合わせることができました。

Taskオブジェクトを初期化し、fetch()が最初のyieldまで実行されます。では、どうやって再開するのでしょうか?

4-4-4. イベントループ

イベントループの再登場です。最初のyieldまで来たら、登録されたEVENT_WRITEが発生するのを待ちます。イベントループは心拍みたいに、脈動し始めれば、ずっと動き続きます。

def loop():
    while not stopped:
        # 何らかのイベントが発生するまでブロッキングする
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()


if __name__ == '__main__':
    elapsed_times = 0
    for _ in range(10):
        selector = DefaultSelector()
        stopped = False
        paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
        start = time.time()
        for path in paths_todo:
            crawler = Crawler(path)
            Task(crawler.fetch())
        loop()
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

実行結果:

elapsed_time: 0.30[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.29[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.28[sec]
elapsed_time: 0.26[sec]
mean_elapsed_time: 0.27[sec]

10回の平均時間は0.27秒です。今回のloopは前と少し違います。callback()event_keyとevent_maskを受け取らなくなりました。つまり、ここのコールバックは誰がイベントを発生させたのを知る必要がなく、fetch()と合わせて見れば分かりますが、コールバックはただfutureset_result()で値を設置すれば良いです。そして、どのfutureなのも知る必要がなく、コルーチンは自身の状態を保存でき、自分のfutureさえ分かれば問題ないです。どの値を設置するのも気にする必要がなく、コルーチンは全部処理してくれます。

4-4-5. 「ジェネレーター・コルーチンスタイル VS コールバックスタイル」のまとめ

コールバックスタイル:

  • 呼び出しのチェーンが存在する
  • リクエストとレスポンスは2つのコールバックに分けなければならない。コードの構造を破壊した
  • コールバック間の状態保持をエンジニアが管理する必要がある

ジェネレーター・コルーチンスタイル:

  • 呼び出しのチェーンが存在しない
  • selectorのコールバックはfutureに値を設置するだけで、ビジネスロジックから切り離せた
  • loop内のcallback()は誰がイベントを発生させたのを知る必要がなくなった
  • 同期処理に近い構造になった
  • 各コルーチン間の状態保持をエンジニアが管理する必要がなくなった(例えば、自分のsockはどれかなど)

4-4-6. コードを更にリファクタリングする

コードは少し読みにくいですね。ここでもし、fetchの耐欠陥性、機能を更に高度にしてくださいと言われたらどうすれば良いでしょう。また、技術ロジック(socket関連)とビジネスロジック(リクエストとレスポンスの処理)が混在していて良くないですね。

  • socket接続の作成は抽象化(関数・メソッド化)できる
  • レスポンスの読み込みループも抽象化できる
  • ループ内のsocket.recv()も抽象化できる

しかし、これらのところにyieldが存在し、抽象化するなら、ジェネレーターにする必要があります。それに、fetch()自身もジェネレーターで、ジェネレーターの中でジェネレーターを弄るのは、コードを更に複雑にするかもしれません。

Pythonの設計者たちもこの問題に気づき、yield fromというジェネレーターの中でジェネレーターを弄るおもちゃを用意しくれました。

4-5. yield fromでジェネレーターベースのコルーチンを改良

4-5-1. yield from文法の紹介

yield fromはPython 3.3から導入された文法(PEP 380)です。PEP 380は主に、ジェネレーターの中でジェネレーターを弄る時の不便を解消するためのもので、2つの機能があります。

その1つは、イテレーションを回してサブジェネレーターをyieldする必要がなくなり、直接yield fromできるという機能です。以下の2種類のジェネレーターは機能として同等です。

def gen_1():
    sub_gen = range(10)
    yield from sub_gen


def gen_2():
    subgen = range(10)
    for item in subgen:
        yield item

もう1つは、サブジェネレーターとメインジェネレーターの間に相互通信できるチャンネルを開けてくれる機能です。

def gen():
    yield from sub_gen()


def sub_gen():
    while 1:
        x = yield
        yield x + 1


def main():
    g = gen()
    next(g)  # 最初のyieldまで実行させる
    retval = g.send(1)  # gen()にデータを送るように見えるが、実はsub_gen()に送っている
    print(retval)  # sub_gen()から計算済みの2が出力される
    g.throw(StopIteration)  # sub_gen()のところでエラーを起こす

上記のコードはyield fromの相互通信機能を示したものです。yield fromgen()の内部に、sub_gen()main()の間に通信チャンネルを開けてくれています。main()から直接データ1sub_gen()に送ることができ、sub_gen()からも直接計算済みの値2main()に返すことができます。また、main()から直接sub_gen()にエラーを送ることで、sub_gen()を終了させることもできます。

ちなみに、yield fromyield from <generator>だけではなく、yield from <iterable>も可能です。

4-5-2. リファクタリング

socket接続を抽象化

def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())

レスポンスの読み込みを抽象化

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f
    selector.unregister(sock.fileno())
    return chunk


def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b''.join(response)

Crawlerのリファクタリング

class Crawler:
    def __init__(self, path):
        self.path = path
        self.response = b''

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ('example.com', 80))
        get = f'GET {self.path} HTTP/1.0\r\nHost: example.com\r\n\r\n'
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        paths_todo.remove(self.path)
        if not paths_todo:
            stopped = True

ここまでのコードは特に問題ありません。再利用できる部分は関数として抽象化しました。サブジェネレーターの値もyield fromで取得できるようにしました。ただし、1つ注意すべきは、futureオブジェクトを返すときに、yieldではなく、yield fromを使用している点です。yieldは普通のPythonオブジェクトに適用できますが、yield fromはできません。ここで、Futureを改造して、iterableオブジェクトにする必要があります。

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        yield self
        return self.result

__iter__を追加しただけです。もちろん、必ずしもyield fromを使う必要はないですが、yieldのままでも問題ありません。しかし、ジェネレーターベースのコルーチンをしているのか、それともジェネレーターだけなのかを区別するために、使い分けをしたほうが良いです。そのため、Python 3.3からyield fromが導入された後、yieldでコルーチンを作るのは非推奨になったわけです。yield fromの相互通信機能を利用して、コルーチン間にデータを自由自在に送り合えるメリットもあります。

4-5-3. yield fromによるコルーチン改良のまとめ

yield fromによるコルーチンの改良で、コードの抽象レベルを高められ、ビジネスロジック関連を更にシンプル化できました。相互通信機能により、コルーチン間のデータのやりとりが簡単になりました。これで、Python非同期プログラミングは大きな進歩を遂げました。

そして、Python言語の開発者たちもyield fromをフル活用しました。Guidoさんが主導したPython非同期プログラミングフレームワークTulipも猛スピードで進化し、Python 3.4で名前をasyncioに変更して、標準ライブラリとして仮採用(on a provisional basis)しました。

4-6. asyncio

4-6-1. asyncioの紹介

asyncioはPython 3.4から実験的に導入された非同期I/Oフレームワーク(PEP 3156)です。asyncioはPythonにおいて、コルチーンによる非同期I/Oプログラミングのインフラとして提供されました。コアのコンポーネントとして、イベントループ(Event Loop)、コルーチン(Coroutine)、タスク(Task)、フューチャー(Future)とその他の補助的なモジュールから構成されています。

asyncioが導入された時、@asyncio.coroutineというデコレーターも提供されました。yield fromを使用した関数につけることで、コルーチンとしてマークすることができますが、強制使用は強いられていないです。

Python 3.4からyield fromの助力で、コルーチンが簡単に作れるようになりましたが、歴史的な問題もそうですが、人々はジェネレーターコルーチンの区別と関係性が分かりません。そして、yieldyield fromの違いも分かりません。この混乱状態は「Pythonの禅」のルールに背いています。

それで、Python 3.5から、Pythonの設計者たちは急ぎながら、async/await文法(PEP 492)を追加し、コルーチンを明示的に支持している様子を見せました。これをネイティブコルーチンと呼びます。async/awaityield fromの2つのコルーチンスタイルは内部実装が同じで、お互い互換性があります。

そして、Python 3.6から、asyncioが正式的に標準ライブラリーに仲間入りしました。以上はCPythonにおいての非同期I/Oの進化の軌跡です。

4-6-2. asyncioとネイティブコルーチン

asyncioasync/await文法の便利さを体験してみましょう。

import asyncio
import aiohttp
import time


async def fetch(session, url):
    async with session.get(url) as response:
        response = await response.read()
        return response


async def fetch_many(host, paths_todo):
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [fetch(session, host + path) for path in paths_todo]
        responses = await asyncio.gather(*tasks)
        return responses


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    host = 'http://example.com'
    paths_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
    elapsed_times = 0
    for _ in range(10):
        start = time.time()
        loop.run_until_complete(fetch_many(host, paths_todo))
        elapsed_time = time.time() - start
        elapsed_times += elapsed_time
        print(f"elapsed_time: {(elapsed_time):.2f}[sec]")
    print(f"mean_elapsed_time: {(elapsed_times/10):.2f}[sec]")

実行結果:

elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.24[sec]
elapsed_time: 0.24[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.26[sec]
elapsed_time: 0.25[sec]
elapsed_time: 0.26[sec]
mean_elapsed_time: 0.25[sec]

ジェネレーターベースのコルーチンスタイルと比較すると、asyncioスタイルは結構違いますね。

  • yieldyield fromが消え、代わりにasync/awaitが使われる
  • 自作のloop()が消え、代わりにasyncio.get_event_loop()が使われる
  • socketに対する明示的な処理は必要なくなり、aiohttpライブライがやってくれる
  • FutureTaskの定義も消え、asyncioが実装してくれている
  • 少ないコードで、エレガントな設計を実現できた

HTTPリクエストの送信とレスポンスの受信などを自分でsocketを操作しない理由は、実際の業務で、うまくHTTPプロトコルを処理するのは極めて難しくて、機能が充実した非同期HTTPクライアントさえあれば、自力でする必要がなくなるからです。

同期ブロッキングバージョンのコードと比較すると:

  • 非同期化
  • コード数はほぼ一緒(aiohttpの導入で逆に少ない)
  • コードのロジックは同じぐらいシンプル
  • 10倍ぐらいの性能アップ

まとめ

Pythonにおいての非同期プログラミングの発展から仕組みまで詳しく見てきました。最終的に、同期処理と同じぐらいシンプルなコードでN倍の効率向上を実現しました。しかも、コールバック地獄のような欠点もありません。
 
 asyncioの詳しい使い方、その長所と短所、そして、Pythonエコシステム内の他の非同期I/Oソリューション、asyncioとの区別などはまた他の記事で紹介します。

参考

A Web Crawler With asyncio Coroutines
Latency numbers every programmer should know

Discussion