🐈

Connecpy のこと

に公開

ConnecpyというConnect ProtocolのPython実装プロジェクトに関わっています。最初は私が始めたプロジェクトですが、最近は優秀なコントリビュータの方が主要な開発を担っており、素晴らしい進化を遂げています。

なぜConnect Protocolか

gRPCは素晴らしいプロトコルですが、いくつか課題があります。

ブラウザから直接呼べない
HTTP/2の特殊な使い方をするため、ブラウザのfetch APIでは扱えない

プロキシやCDNとの相性
多くのインフラがHTTP/1.1を前提としている

デバッグのしづらさ
curlで気軽に叩けない、リクエスト/レスポンスが見づらい

Connect Protocolは、これらの課題を解決しつつgRPCの良さを保ったプロトコルです。

  • HTTP/1.1でも動く(もちろんHTTP/2も可)
  • 通常のPOSTリクエストとして扱える
  • JSONとProtobufの両方をサポート

PythonのRPCエコシステムの現状

Pythonには既にgrpcioという公式のgRPC実装があります。しかし、Connect Protocolの実装は少なくとも私が作り始めた時(2023年12月頃?)にはありませんでした。

また、grpcioは以下のような特徴があります。

  • C++実装のラッパー(ピュアPythonではない)
  • 独自のイベントループ
  • ASGIやWSGIとの統合が難しい

Connect ProtocolはHTTPの上に薄いレイヤーを被せただけなので、既存のWebフレームワークと統合しやすいはずです。

Connecpyの設計方針

1. まずはUnary(単一リクエスト/レスポンス)から

Connect Protocolは4つの通信パターンをサポートします。

  • Unary(単一リクエスト/レスポンス)
  • Server Streaming
  • Client Streaming
  • Bidirectional Streaming

現在のConnecpy(バージョン2.0.0)はUnaryに集中しています。理由は簡単で、これが最も使用頻度が高く、実装もシンプルだからです。

# 通常のHTTP POSTとして扱える
curl -X POST -H "Content-Type: application/json" \
  -d '{"inches": 12}' \
  http://localhost:3000/package.Service/MakeHat

2. ASGIネイティブ

現代のPython Webアプリケーションは、多くがASGI(Asynchronous Server Gateway Interface)を採用しています。ConnecpyもASGIファーストで設計されています。

重要なのは、Connecpyが生成するアプリケーションは標準的なASGI/WSGIアプリケーションだということです。つまり、既存のASGI/WSGIミドルウェアやフレームワークとシームレスに統合できます。

3. protocプラグインによるコード生成

.protoファイルから型安全なサーバー/クライアントコードを生成します。

# protocプラグインのインストール(Go経由)
go install github.com/i2y/connecpy/v2/protoc-gen-connecpy@latest

# または、GitHubリリースページからバイナリをダウンロード
# https://github.com/i2y/connecpy/releases/latest

# Connecpyパッケージのインストール
pip install connecpy

実装例:帽子屋サービス

1. .protoファイルの定義

syntax = "proto3";

package i2y.connecpy.example;

message Size {
  int32 inches = 1;
}

message Hat {
  int32 size = 1;
  string color = 2;
  string name = 3;
}

service Haberdasher {
  rpc MakeHat(Size) returns (Hat);
}

2. コード生成

protoc --python_out=./ --pyi_out=./ --connecpy_out=./ ./haberdasher.proto

この例ではprotocを使用していますが、もちろんbufでもOKです。

3. サービスの実装

# service.py
import random

from connecpy.code import Code
from connecpy.exceptions import ConnecpyException
from connecpy.server import ServiceContext

from haberdasher_pb2 import Hat, Size


class HaberdasherService:
    async def MakeHat(self, req: Size, ctx: ServiceContext) -> Hat:
        print("remaining_time: ", ctx.timeout_ms())
        
        if req.inches <= 0:
            raise ConnecpyException(
                Code.INVALID_ARGUMENT, 
                "inches: I can't make a hat that small!"
            )
        
        response = Hat(
            size=req.inches,
            color=random.choice(["white", "black", "brown", "red", "blue"]),
        )
        
        if random.random() > 0.5:
            response.name = random.choice(
                ["bowler", "baseball cap", "top hat", "derby"]
            )
        
        return response

4. サーバーの起動

# server.py
import haberdasher_connecpy
from service import HaberdasherService

app = haberdasher_connecpy.HaberdasherASGIApplication(
    HaberdasherService()
)
# Uvicornで起動
uvicorn --port=3000 server:app

# またはDaphne(HTTP/2サポート)
daphne --port=3000 server:app

# またはHypercorn(HTTP/2サポート)
hypercorn --bind :3000 server:app

5. クライアントからの呼び出し

非同期クライアント:

# async_client.py
import asyncio
import httpx
from connecpy.exceptions import ConnecpyException
import haberdasher_connecpy, haberdasher_pb2

server_url = "http://localhost:3000"
timeout_s = 5

async def main():
    async with httpx.AsyncClient(
        base_url=server_url,
        timeout=timeout_s,
    ) as session:
        async with haberdasher_connecpy.HaberdasherClient(
            server_url, 
            session=session
        ) as client:
            try:
                response = await client.MakeHat(
                    haberdasher_pb2.Size(inches=12),
                )
                if not response.HasField("name"):
                    print("We didn't get a name!")
                print(response)
            except ConnecpyException as e:
                print(e.code, e.message)

if __name__ == "__main__":
    asyncio.run(main())

同期クライアント:

# client.py
from connecpy.exceptions import ConnecpyException
import haberdasher_connecpy, haberdasher_pb2

server_url = "http://localhost:3000"
timeout_s = 5

def main():
    with haberdasher_connecpy.HaberdasherClientSync(
        server_url, 
        timeout_ms=timeout_s * 1000
    ) as client:
        try:
            response = client.MakeHat(
                haberdasher_pb2.Size(inches=12),
            )
            if not response.HasField("name"):
                print("We didn't get a name!")
            print(response)
        except ConnecpyException as e:
            print(e.code, e.message)

if __name__ == "__main__":
    main()

様々な呼び出し方法

Connecpyサーバーは標準的なHTTPなので、様々な方法で呼び出せます。

buf curlでProtobuf:

buf curl --data '{"inches": 12}' -v \
  http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat \
  --schema ./haberdasher.proto

curlでJSON:

curl -X POST -H "Content-Type: application/json" \
  -d '{"inches": 12}' -v \
  http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

ミドルウェアとの統合

Connecpyが生成するアプリケーションは標準的なASGI/WSGIアプリケーションなので、既存のエコシステムと完全に互換性があります。

ASGIミドルウェアの例

from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.authentication import AuthenticationMiddleware

app = haberdasher_connecpy.HaberdasherASGIApplication(
    HaberdasherService()
)

# 複数のミドルウェアを適用
app = CORSMiddleware(app, allow_origins=["*"])
app = TrustedHostMiddleware(app, allowed_hosts=["example.com", "*.example.com"])
# 認証ミドルウェアも追加可能
# app = AuthenticationMiddleware(app, backend=BasicAuthBackend())

FastAPIへのマウント

from fastapi import FastAPI

fastapi_app = FastAPI()

# 通常のFastAPIエンドポイント
@fastapi_app.get("/health")
def health_check():
    return {"status": "ok"}

# ConnecpyサービスをFastAPIにマウント
haberdasher_app = haberdasher_connecpy.HaberdasherASGIApplication(service)
fastapi_app.mount(haberdasher_app.path, haberdasher_app)

WSGIミドルウェアの例(Flask)

from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware

flask_app = Flask(__name__)

@flask_app.route("/")
def hello():
    return "Hello from Flask!"

# Connecpy WSGIアプリケーション(生成されたコードで利用可能)
haberdasher_app = haberdasher_connecpy.HaberdasherWSGIApplication(service)

# FlaskとConnecpyを組み合わせる
app = DispatcherMiddleware(flask_app, {
    haberdasher_app.path: haberdasher_app
})

この統合性により、認証、ロギング、レート制限など、既存のミドルウェアをそのまま活用できます。

高度な機能

インターセプター(ASGIのみ)

リクエスト/レスポンスの前後に処理を挟めます。

from typing import Any, Callable
from connecpy.server import ServerInterceptor, ServiceContext
import haberdasher_connecpy
from service import HaberdasherService

class MyInterceptor(ServerInterceptor):
    def __init__(self, msg):
        self._msg = msg
    
    async def intercept(
        self,
        method: Callable,
        request: Any,
        ctx: ServiceContext,
        method_name: str,
    ) -> Any:
        print(f"intercepting {method_name} with {self._msg}")
        return await method(request, ctx)

app = haberdasher_connecpy.HaberdasherASGIApplication(
    HaberdasherService(),
    interceptors=[MyInterceptor("A"), MyInterceptor("B")]
)

圧縮サポート

gzip、brotli、zstandardをサポートしています。

# 非同期クライアントで圧縮を指定
async with haberdasher_connecpy.HaberdasherClient(
    server_url,
    send_compression="br",      # Brotli圧縮でリクエスト
    accept_compression=["gzip"]  # gzip圧縮でレスポンス
) as client:
    response = await client.MakeHat(
        haberdasher_pb2.Size(inches=12)
    )

# 同期クライアントで圧縮を指定
with haberdasher_connecpy.HaberdasherClientSync(
    server_url,
    send_compression="zstd",     # Zstandard圧縮でリクエスト
    accept_compression=["br"]    # Brotli圧縮でレスポンス
) as client:
    response = client.MakeHat(
        haberdasher_pb2.Size(inches=12)
    )

CORSサポート

ConnecpyのASGIアプリケーションは標準的なASGIアプリケーションなので、任意のCORSミドルウェアと組み合わせられます。

from starlette.middleware.cors import CORSMiddleware

app = haberdasher_connecpy.HaberdasherASGIApplication(
    HaberdasherService()
)

# StarletteのCORSミドルウェアを使用
app = CORSMiddleware(
    app,
    allow_origins=["*"],
    allow_methods=["POST", "OPTIONS"],
    allow_headers=["*"],
)

WSGIサポート

レガシーなPythonアプリケーションとも統合できます。WSGIアプリケーションは生成されたコードから直接作成できます(ASGIと同様の方法)。

Proto Editionsサポート

バージョン2.0.0から、Proto Editions 2023もサポートしています。

edition = "2023";

package example;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

ルーティング

複数のサービスを組み合わせる場合、生成されたアプリケーションのpathプロパティを使用できます。

haberdasher_app = haberdasher_connecpy.HaberdasherASGIApplication(service)
print(haberdasher_app.path)  # "/i2y.connecpy.example.Haberdasher"

# Starletteでのマウント例
from starlette.routing import Mount
from starlette.applications import Starlette

app = Starlette(routes=[
    Mount(haberdasher_app.path, app=haberdasher_app)
])

なぜConnecpyを使うか

Connect Protocolの利点を活かしたいとき

  • ブラウザから直接RPCを呼びたい
  • curlでデバッグしたい
  • 既存のHTTPインフラ(CDN、ロードバランサー)を活用したい

Pythonらしい実装が欲しいとき

  • ASGIやWSGIと自然に統合したい
  • 既存のASGI/WSGIミドルウェアをそのまま使いたい
  • FastAPIやStarlette、Django、Flaskと組み合わせたい
  • asyncioベースの非同期処理を活用したい

段階的な移行を考えているとき

  • 今はHTTP/JSONで始めて、将来的にProtobufに移行したい
  • RESTful APIからRPCへの移行パスが欲しい

まとめ

ConnecpyはConnect ProtocolのシンプルなPython実装です。

HTTPの使いやすさとRPCの型安全性を両立させたConnect Protocolを、Pythonのエコシステムに自然に統合できるように設計しました。

現在はUnaryのみの実装ですが、実用レベルに達しています。標準的なHTTPとして扱えるので、既存のツールやインフラとの相性も良好です。

Connect ProtocolをPythonで使いたい方は、ぜひ試してみてください。

Appendix

リポジトリ
Connecpy

関連プロジェクト
PydanticRPC - Connect Protocolサポートで内部的にConnecpyを使用

Discussion