🐵
FXのデータをリアルタイムに非同期で受け取る方法
記事にするか迷ったがGMO FX普及活動のため書き溜めておく
必要なライブラリはpybottersです.
リンク先に導入方法があります.
通貨ペアはドル円です
"symbol": "USD_JPY"
を変えることで他の通貨も可能
詳しくはGMO ドキュメントにて
コメントアウトしてます
ご自身でカスタマイズして使ってください
import os
import pybotters
import asyncio
class FX:
def __init__(self) -> None:
self.data = {} # ここにティッカーのデータが入る
# websocket
async def fx_ws(self, client):
client.ws_connect(
'wss://forex-api.coin.z.com/ws/public/v1',
send_json={
"command": "subscribe",
"channel": "ticker",
"symbol": "USD_JPY"
},
hdlr_json=self.onmessage
)
# ハンドラー
def onmessage(self, msg, ws):
self.data = msg
# データを1秒ごとに出力
async def data_loop(self):
while True:
print(self.data)
await asyncio.sleep(1)
# 外から呼ぶメインの関数
async def main(self):
async with pybotters.Client() as client:
await self.fx_ws(client)
async with asyncio.TaskGroup() as tg:
tg.create_task(
self.data_loop()
)
if __name__ == '__main__':
bot = FX()
# もしOSがwindowsなら呪文を唱える
if os.name == 'nt':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
try:
# bot起動
asyncio.run(bot.main())
except KeyboardInterrupt:
pass
Discussion