threadとasyncio
この記事ではthreadingとasyncioについて扱います。
はじまり
あなたは業務でHTTPヘルスチェッカーの作成を命じられました。先日リリースしたウェブサービスがちゃんと稼働しているか確認するために、ヘルスチェック用のエンドポイントにリクエストしてそのステータスコードを取得する単純なものです。
こういったちょっとしたプログラムであれば簡単に作れそうです。あなたが得意なPythonには標準ライブラリのurllib.requestにHTTPクライアントが用意されているので、それを使ってこう書けるはずです…(今回はrequestsライブラリは使用しません)。
import urllib.request
def healthcheck(url: str):
try:
response = urllib.request.urlopen(url)
except urllib.error.HTTPError as e:
return e.code
return response.status
urlopen()
はHTTPステータスコードが4XX, 5XXの場合には例外を送出するので、try-exceptによる例外処理を入れて正しくステータスコードを返せるようにしました。
完成した関数healthcheck()
はURLを与えるだけでステータスコードを返してくれる優れものです。少し試してみましょう。
print(healthcheck('https://example.com/')) # 200
print(healthcheck('https://example.com/foobar')) # 404
うん、よしよし。うまくいっているようです。あとはこの関数を組み込めばあなたの仕事は終了です!今日もぐっすり眠れることでしょう。
ヘルスチェックする先が複数になった
しかし数日後…。
ヘルスチェッカーでチェックする先が複数に増えると上司から伝えられました。あなたはそれぐらいなら対応はすぐに終わりますと言って、以下に示すような関数healthcheck_n()
を新たに作って複数エンドポイントへの対応を終えました。
def healthcheck_n(urls: Sequence[str]):
return [healthcheck(url) for url in urls]
healthcheck_n()
はhealthcheck()
の複数版で、URLエンドポイントの配列を与えると結果をステータスコードの配列として返します。
result = healthcheck_n([
'https://example.com/',
'https://example.com/foobar'
])
print(result) # [200, 404]
動作も良好です。こんな楽な改修であればいくらでもやりたいぐらいです。しかしあなたはまだ気づいていませんでした。ヘルスチェックする先が100箇所以上だということを…。
ある日、あなたはヘルスチェッカーの不具合を疑われます。1分に1回のヘルスチェックができてないと同僚が言い出したのです。それもそのはず、逐次的にHTTPリクエストをしているので、100件あってそれぞれ3秒待つなら300秒も1周のヘルスチェックにかかることになります。これでは1分周期のヘルスチェックができるはずがありません…。ざんねん!
さてどうする?
逐次的に処理して追いつかないなら打つ手は主にふたつです。
- 性能を上げる。 単純ですが多くの場面で有効な手段です。ただし性能向上できるのが自分の手の届く範囲にあるときだけです。今回はコストや接続先エンドポイントの応答速度にも依存するので難しいところです。
- 同時に複数のリクエストをする。 ひとつずつの処理に速度が出なくても、同時に複数の処理ができるのであれば時間の短縮になります。相手側にある程度の並列処理能力があることが前提です。
今回は同時リクエストを行うふたつの方法について考えていきましょう。
threadingによる並列処理
いろいろと調べるところによれば、むかしからあるスレッドという機能を使えば並列処理できることがわかりました。スレッドにはデーモンスレッドとそうでないスレッドがあります。デーモンスレッドはスレッドを作り出したメインスレッドが終了したときに自動的に終了するスレッドで(この場合リソースは適切に解放されないかもしれません)、そうでないスレッドはメインスレッドが終了してもスレッド内の処理が終わらない限り残り続けます。
スレッドの終了はThread.join()
メソッドで待つことができ、これによって終了を知ることができます。それではスレッドを使ったアプローチに挑戦してみましょう。
import threading
from typing import Sequence
def healthcheck_threads(urls: Sequence[str]):
threads = []
for url in urls:
t = theading.Thread(target=healthcheck, args=(url,))
t.start()
threads.append(t)
for t in threads:
t.join()
できました!
スレッドを使うと並列して同時にHTTPリクエストできるので劇的にヘルスチェックの時間が短くなりました!でも、あれ?何かに気づきませんか?…そう、スレッドは値を返せないのです!
さて困りました。しかしスレッドを作った人は賢いのでこれに対する答えも用意されています。それが同期キューです。間接的にキューを使うことでメインスレッドやサブスレッド間の値の受け渡しが可能となります。スレッドでデータのやり取りをする場合、キューに書き込むタイミングは制御できないので、順序情報が必要であればそれもキューに書き込むなどの工夫が必要です。
import queue
from typing import Sequence
def healthcheck_queue(url: str, i: int, q: queue.Queue):
try:
response = urllib.request.urlopen(url)
except urllib.error.HTTPError as e:
q.put((i, e.code))
q.put((i, response.code))
def healthcheck_threads(urls: Sequence[str]):
q = queue.Queue()
threads = []
for i, url in enumerate(urls):
t = threading.Thread(target=healthcheck_queue, args=(url, i, q))
t.start()
threads.append(t)
for t in threads:
t.join()
return [x[1] for x in sorted(list(q.queue), key=lambda x: x[0])]
同期キューの代わりに通常のリストを使うこともできますが、通常のリストはロック機構がないので、複数スレッド間で同時にアクセスされると結果が保証されません。
また、join()
にはタイムアウトを設定できますが、複数のスレッドを待つ場合にはここでタイムアウトを設定すると最大の待ち時間がスレッド数 * タイムアウト
秒になってしまいます。時間で処理を打ち切りたいのであればサブスレッド側に組み込むことで待ち時間をnスレッド中の最大実行時間
秒に抑えられます。
スレッドは無限に作れるわけではありません。スレッドが増えるとコンテキストスイッチ(スレッドの切り替え)にかかる時間が増えるため、実際には使用するスレッドの最大数を決める必要が出てくるでしょう。例えば次のようなコードを書けば、10個ずつ小分けにしてリクエストできます。
while urls:
for i, url in enumerate(urls[:10]):
# URLリクエストをする
urls = urls[10:]
asyncioによる並列処理
最近の流行りはasyncioです。なんでかって?みんなが使っていてクールだと言うからです!言語によってはasync/awaitやFuture、Promiseと呼ばれていたりしますが、結局みんな同じものを指しています。
通常我々が作っているのは同期的に実行される関数ですが、asyncioの世界では非同期的に実行されるasync関数を定義できます。async関数は非同期的に実行され、いつ終了するのかはわかりませんがいつかは終了し、スレッドと異なり値を返せます。 asyncioはシングルスレッドで動作しますが、タスクスケジューラによってメインスレッドの空き時間にうまく挿入されるので、メインスレッドをブロックしません。
また注意点としてasyncioはシングルスレッドで動作するというものの、それは並列処理されないことと同義ではありません。非同期処理が多重起動される状況においては並列処理されることもあります。場合によってはMutexを使用して同時アクセスを防ぐような対策が必要となります。
さて、それではasyncio版を作ってみましょう。これまでの関数を流用してリクエスト部はこう書けます。
async def healthcheck_async(url: str):
return healthcheck(url)
単純にasync版のhealthcheck_async()
を定義しました。呼び出し側のコードも書きます。
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
healthcheck_async('https://example.com/')
)
print(result) # 200
よさそうですね。ではこれを並列実行します。asyncio.gather()
は複数のタスクを監視し、すべてが終了すると引数の順番どおりに結果を返してくれます。
async def healthcheck_n_async(urls: Sequence[str]):
return await asyncio.gather(*[healthcheck_async(url) for url in urls])
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
healthcheck_n_async(['https://example.com/'] * 10)
)
print(result) # [200, 200, 200, 200, 200, 200, 200, 200, 200, 200]
async関数内でさらにasync関数を呼び出す際にはawait
キーワードを使います。最初のうちは理解するのに苦労しますが、非同期関数内で同期的に待つためのキーワードとして認識しておけば問題ありません。
実行してみましょう。あれ…もしかして…遅い…?結果が返ってくるまで10秒くらいかかるはずです。なぜなら同期的に実行されているからです。なんだって!?
asyncキーワードは待機可能である関数の目印でしかない上に、ここでのhealthcheck()
は同期的に呼び出されているからです。処理をタスクスケジューラに載せるにはもう一手間必要です。そのために最初に書いたhealthcheck_async()
の実装を少し書き換えます。
async def healthcheck_async(url):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, healthcheck, url)
run_in_executor()
で実行する関数をラップすることで非同期するためのタスクスケジューラに登録できます。あとはこれをawaitすれば並列に非同期通信が走るはずです!
もちろんasyncioを使う場合であっても一度に数百のタスクを同時にこなせるわけではないので、スレッドのとき同様いくつか小分けにして並列実行する必要が出てきます。
翌日…
上司に「別の方法でモニタリングするからヘルスチェッカーは不要になった」と言われました。現実はそんなものです。
Discussion