✨
【Python】asyncio と h2 で HTTP/2 サーバーをつくる
サンプルコード(asyncio-server.py)の asyncio に関する部分が古くなっていたので修正することになった。
pip install h2
import asyncio
import io
import json
import ssl
import collections
from typing import List, Tuple
from h2.config import H2Configuration
from h2.connection import H2Connection
from h2.events import (
ConnectionTerminated, DataReceived, RemoteSettingsChanged,
RequestReceived, StreamEnded, StreamReset, WindowUpdated
)
from h2.errors import ErrorCodes
from h2.exceptions import ProtocolError, StreamClosedError
from h2.settings import SettingCodes
# https://github.com/python-hyper/h2/blob/master/examples/asyncio/asyncio-server.py
RequestData = collections.namedtuple('RequestData', ['headers', 'data'])
class H2Protocol(asyncio.Protocol):
def __init__(self):
config = H2Configuration(client_side=False, header_encoding='utf-8')
self.conn = H2Connection(config=config)
self.transport = None
self.stream_data = {}
self.flow_control_futures = {}
def connection_made(self, transport: asyncio.Transport):
self.transport = transport
self.conn.initiate_connection()
self.transport.write(self.conn.data_to_send())
def connection_lost(self, exc):
for future in self.flow_control_futures.values():
future.cancel()
self.flow_control_futures = {}
def data_received(self, data: bytes):
try:
events = self.conn.receive_data(data)
except ProtocolError as e:
self.transport.write(self.conn.data_to_send())
self.transport.close()
else:
self.transport.write(self.conn.data_to_send())
for event in events:
if isinstance(event, RequestReceived):
self.request_received(event.headers, event.stream_id)
elif isinstance(event, DataReceived):
self.receive_data(
event.data, event.flow_controlled_length, event.stream_id
)
elif isinstance(event, StreamEnded):
self.stream_complete(event.stream_id)
elif isinstance(event, ConnectionTerminated):
self.transport.close()
elif isinstance(event, StreamReset):
self.stream_reset(event.stream_id)
elif isinstance(event, WindowUpdated):
self.window_updated(event.stream_id, event.delta)
elif isinstance(event, RemoteSettingsChanged):
if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
self.window_updated(None, 0)
self.transport.write(self.conn.data_to_send())
def request_received(self, headers: List[Tuple[str, str]], stream_id: int):
headers = collections.OrderedDict(headers)
method = headers[':method']
request_data = RequestData(headers, io.BytesIO())
self.stream_data[stream_id] = request_data
def stream_complete(self, stream_id: int):
try:
request_data = self.stream_data[stream_id]
except KeyError:
return
headers = request_data.headers
body = request_data.data.getvalue().decode('utf-8')
data = json.dumps(
{"headers": headers, "body": body}, indent=4
).encode("utf8")
response_headers = (
(':status', '200'),
('content-type', 'application/json'),
('content-length', str(len(data))),
('server', 'asyncio-h2'),
)
self.conn.send_headers(stream_id, response_headers)
asyncio.ensure_future(self.send_data(data, stream_id))
def receive_data(self, data: bytes, flow_controlled_length: int, stream_id: int):
try:
stream_data = self.stream_data[stream_id]
except KeyError:
self.conn.reset_stream(
stream_id, error_code=ErrorCodes.PROTOCOL_ERROR
)
else:
stream_data.data.write(data)
self.conn.acknowledge_received_data(flow_controlled_length, stream_id)
def stream_reset(self, stream_id):
if stream_id in self.flow_control_futures:
future = self.flow_control_futures.pop(stream_id)
future.cancel()
async def send_data(self, data, stream_id):
while data:
while self.conn.local_flow_control_window(stream_id) < 1:
try:
await self.wait_for_flow_control(stream_id)
except asyncio.CancelledError:
return
chunk_size = min(
self.conn.local_flow_control_window(stream_id),
len(data),
self.conn.max_outbound_frame_size,
)
try:
self.conn.send_data(
stream_id,
data[:chunk_size],
end_stream=(chunk_size == len(data))
)
except (StreamClosedError, ProtocolError):
break
self.transport.write(self.conn.data_to_send())
data = data[chunk_size:]
async def wait_for_flow_control(self, stream_id):
f = asyncio.Future()
self.flow_control_futures[stream_id] = f
await f
def window_updated(self, stream_id, delta):
if stream_id and stream_id in self.flow_control_futures:
f = self.flow_control_futures.pop(stream_id)
f.set_result(delta)
elif not stream_id:
for f in self.flow_control_futures.values():
f.set_result(delta)
self.flow_control_futures = {}
async def main():
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.load_cert_chain('localhost.pem', 'localhost-key.pem')
ctx.set_alpn_protocols(['h2', 'http/1.1'])
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: H2Protocol(),
'127.0.0.1', 8000, ssl=ctx)
async with server:
await server.serve_forever()
asyncio.run(main())
h2 の使い方を学ぶ上で data_received
の定義のうち events = self.conn.receive_data(data)
の行を理解できればよい。クライアントからのデータが解析されるとイベントオブジェクト (events
) が生成される。
具体的な HTTP/2 フレームがどのように解析されるのかは次のコードを参照
>>> import h2.connection
>>> import h2.events
>>> data = b'\x00\x00\x00\x04\x00\x00\x00\x00\x00'
>>> c = h2.connection.H2Connection()
>>> events = c.receive_data(data)
>>> [event for event in events]
[<RemoteSettingsChanged changed_settings:{}>]
>>> [isinstance(event, h2.events.RemoteSettingsChanged) for event in events]
[True]
Discussion