🙆‍♀️

Web3.pyで AWS Managed Blockchainに接続する

2021/12/19に公開

背景

Web3.pyはEthereumのPythonインターフェイスとして広く使われています。Web3.pyはインターフェイスですから、通信先のノードは別途で用意する必要があります。ノードは自前でGethを立ち上げるという手段のほか、運用の手間を省くならAlchemyやInfuraなどのフルノードサービスを使う手段もあります。また、AWSにもManaged Blockchainというサービスがあり、同様の機能が提供されています。Web3.pyからManaged Blockchainに接続する場合、単にHTTPエンドポイントを入力すればいいという話ではなかったので、設定手順をまとめました。

自分の環境

本メモにおいて、Pythonのバージョンは3.9、web3.pyのバージョンは5.25.0を前提としています。

同期I/O

まずは簡単に同期I/Oで動かす場合です。ノードに同時に多数のリクエストを送るような使い方を想定しなければ、これで十分です。

AWSのAPIへのリクエストではV4署名による認証が必要です。自前で署名コードを実装するのは大変なので、aws-requests-authというモジュールを使います。https://pypi.org/project/aws-requests-auth/
pip install aws-requests-auth

import requests_auth_aws_sigv4 import AWSSigV4
from web3 import Web3, HTTPProvider

url = "https://..... .amazonaws.com"  # Managed Blockchain立ち上げると発行されるHTTPエンドポイント
aws_auth = AWSSigV4("managedblockchain", region="your region")  # regionはap-northeast-1とか

w3 = Web3(HTTPProvider(url, request_kwargs={"auth": aws_auth})

あとはw3に対してw3.eth.get_transaction(...)のようにリクエストを送ればいいです。

非同期I/O

ノードに同時にたくさんリクエストを送りたい場合のために、Web3.pyはAsyncIOに対応しています。ただ、AWS Managed Blockchainのノードで非同期I/Oを実現するには、自前で書かなければならないコードが多く生じました。

from typing import Any
import requests
from requests import PreparedRequest
import requests_auth_aws_sigv4 import AWSSigV4
from web3 import Web3, AsyncHTTPProvider
from web3._utils.request import async_make_post_request
from web3._utils.rpc_abi import RPC
from web3.eth import AsyncEth, BaseEth
from web3.method import Method, default_root_munger
from web3.net import AsyncNet
from web3.providers import BaseProvider
from web3.types import RPCEndpoint, RPCResponse

url = "https://..... .amazonaws.com"
aws_auth = AWSSigV4("managedblockchain", region="your region")

# [解説ポイント1]
def derive_headers_for_aws_service(method str, url str, data=None, headers=None, params=None):
    p = PreparedRequest()
    p.prepare(
        method=method.upper(),
	url=url,
	data=data,
	headers=headers,
	params=params,
	auth=aws_auth
    )
    return p.headers
	

class AsyncAMBHTTPProvider(AsyncHTTPProvider, BaseProvider):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
        request_daa = self.encode_rpc_request(method, params)
	request_kwargs = self.get_request_kwargs()
	headers = derive_headers_for_aws_service("POST", self.endpoint_uri, request_data, **request_kwargs)  # [解説ポイント2]
	request_kwargs["headers"] = headers
	raw_response = await async_make_post_request(
	    self.endpoint_uri,
	    request_data,
	    **request_kwargs
	)
	response = self.decode_rpc_response(raw_response)
	return response				   

呼び出し例

import asyncio

transactions = ["0x.....", "0x.....", ...]

async def process():
    w3 = Web3(AsyncAMBHTTPProvider(url),
            modules={"eth": (AsyncEth,), "net": (AsyncNet,)},  # [解説ポイント3]
	    middlewares=[])
    responses = await asyncio.gather(*[w3.eth.get_transaction(tx) for tx in transactions])
    return responses
    
asyncio.run(process())

コードの解説

Web3に渡す引数をHTTPProviderからAsyncHTTPProviderに変えるだけでは上手く動きません。その理由は、AsyncHTTPProvider内部ではaiohttpを使っており、requestsは使っていないからです。同期I/Oではrequest_kwargs={"auth": aws_auth}として認証情報を連携していましたが、aiohttpに即した実装を必要があります。

とは言え、AWSのV4署名を自前で実装するのは骨が折れます。そこでハックしたのが、関数derive_headers_for_aws_serviceです[解説ポイント1]。この関数を実行すると、実際にrequestsでHTTPリクエストを送ったりはしませんが、aws-requests-authによってヘッダに署名を追記する処理まで行います。そこで生まれるヘッダをasync_make_post_requestに渡しています[解説ポイント2]。

[解説ポイント3]に示すmodulesやmiddlewaresは、参考にした例に載っていたので真似しました。これらを省くと動きません。

足りないコマンド

こうしてAsyncHTTPProviderベースで構築したweb3オブジェクトですが、Gethが対応しているはずのすべてのコマンドが使えませんでした。具体的に例えば、a3.eth.get_transaction_receiptを実行しようとしたら存在していないとのエラーがでました。公式のドキュメントを見たらAsyncHTTPProviderはunstable扱いのようでしたので、まだ色々と不備があるのでしょう。ただ、これは割と簡単にモンキーパッチ(※)できます。
※: モンキーパッチ(Monkey patch)とは、プログラムを実行範囲内でアドホックに修正するテクニック

以下のコードをWeb3()のコンストラクタを呼ぶ前に実行すればいいです。

BaseEth._get_transaction_receipt = Method(
    RPC.eth_getTransactionReceipt,
    mungers=[default_root_munger]
)

async def get_transaction_receipt(self, transaction_hash):
    return await self._get_transaction_receipt(transaction_hash)
    
AsyncEth.get_transaction_receipt = get_transaction_receipt

Discussion