🐵

FXのデータをリアルタイムに非同期で受け取る方法

2024/08/15に公開

記事にするか迷ったがGMO FX普及活動のため書き溜めておく
必要なライブラリはpybottersです.
リンク先に導入方法があります.
通貨ペアはドル円です
"symbol": "USD_JPY"を変えることで他の通貨も可能
詳しくはGMO ドキュメントにて
コメントアウトしてます
ご自身でカスタマイズして使ってください

import os
import pybotters
import asyncio

class FX:
    def __init__(self) -> None:
        self.data = {}  # ここにティッカーのデータが入る

    # websocket
    async def fx_ws(self, client):
        client.ws_connect(
            'wss://forex-api.coin.z.com/ws/public/v1',
            send_json={
                "command": "subscribe",
                "channel": "ticker",
                "symbol": "USD_JPY"
            },
            hdlr_json=self.onmessage
            )

    # ハンドラー
    def onmessage(self, msg, ws):
        self.data = msg

    # データを1秒ごとに出力
    async def data_loop(self):
        while True:
            print(self.data)
            await asyncio.sleep(1)

    # 外から呼ぶメインの関数
    async def main(self):
        async with pybotters.Client() as client:
            await self.fx_ws(client)
            async with asyncio.TaskGroup() as tg:
                tg.create_task(
                    self.data_loop()
                )




if __name__ == '__main__':
    bot = FX()
    # もしOSがwindowsなら呪文を唱える
    if os.name == 'nt':
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    try:
        # bot起動
        asyncio.run(bot.main())
    except KeyboardInterrupt:
        pass

Discussion