🐍

botterのためのasyncio

14 min read 3

Zennの方々は初めまして。まちゅけん(@MtkN1XBt)と申します😎
Twitterなどで仮想通貨のシステムトレード(自動取引bot)に関する情報を発信しています。

私はこれまではnoteに投稿していたのですが、やはりMarkdownで書けるZennの方が馴染むかもしれないと思いと、仮想通貨界隈の技術系記事をZennに開拓していきたいという思いから温めていた記事を投稿してみます!

この記事について

今回はasyncioaiohttpライブラリを用いて、仮想通貨システムトレードを例としたすぐに使える 実践的な非同期通信の解説 をしたいと思います。
「非同期処理」で検索するとよく抽象的な概念の説明があります。根源的に理解するのであればそちらを読むのがいいでしょうが、やはりまず手を動かして実際に通信を行いながら理解する方が効率がいい、と私は考えています。

サクサクと HTTP通信WebSocket通信 のサンプルコードを載せていきますので、その中で非同期通信のメリットと簡単に概念を説明してきます。
この記事を全て理解すれば非同期通信を用いたbotを作成できるようになります✨

事前準備

Python環境にaiohttpをインストールしましょう。

pip install aiohttp

現在ステーブルリリースのPython 3.9を対象にしています。以前のバージョンであると動作しない可能性があります。
環境構築などが分からない方はこちらの記事を参考にしてみてください。

https://note.com/mtkn1/n/nbc33e765558b

Windows環境だとイベントループに関するRuntimeErrorが発生することがあります。
Windows側の不具合 [1] のようで、以降でasyncio.run(main())と記載されるコードの1行上に下記のコードを追加してください。そうすることでエラーは回避されます。
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

REST API

Basic Usage 1

では早速皆さんご存じの取引所 Bybit のREST APIを利用してHTTP通信を行っていきます。
Ticker情報 を取得してみましょう。

BTCUSDのTicker情報を取得して表示できましたね。
これがPythonにおける非同期処理の基本的な書き方であり、非同期通信の最もミニマムな例です。

各行の説明としては、

  • 1-2行目で必要なimportを行っています
  • 4-8行目で非同期通信を行う コルーチン関数 [2] を定義しています
  • 10行目でコルーチン関数をasyncioライブラリで実行しています

4-8行目の非同期通信の説明としては、

  • 5行目で非同期コンテキストマネージャーを用いて HTTPクライアントセッション [3] を作成しています
  • 6行目でまた非同期コンテキストマネージャーを用いて HTTPリクエスト(GET)を行いレスポンスを待機 ( HTTP通信 [4] )します ( HTTPヘッダ が取得されます)
  • 7行目でまた通信を行い HTTPボディ が取得されるまで待機します
  • 7行目の終わりでコネクションをリリースします
  • 8行目で取得したHTTPボディ(Ticker情報)をprintで表示します
  • 8行目の終わりでセッション終了処理で根底コネクタを閉じます

といった流れになります。
最低限の知識として、コルーチン関数はasync defを付けて定義する必要があります。非同期処理はコルーチン関数中でのみ行えます。
非同期コンテキストマネージャーで実装されているクラス(aiohttp.ClientSession)はasync withで開く必要があります。
コルーチン関数で実装されているメソッド(resp.text)はawait文で実行する必要があります。

Basic Usage 2

先ほどのコードはかなりリファレンスに寄せた書き方になっていました。
もう少し実践的な書き方を紹介します。

変わった点としては、

  • 6行目で非同期コンテキストマネージャーではなくawait文を用いてリクエストをしている
  • 6行目で クエリー文字列 をURLに直接ではなくparams引数に指定している
  • 7行目でtextメソッドではなくjsonメソッドを用いてJSONデータを辞書形式に変換している
  • 8行目で辞書から"result"キーをprintで表示している

session.get等のリクエストのメソッドについては、非同期コンテキストマネージャーではなくawait文で実行するのがいいと思います。インデントが増えてしまうのを避けることができます。

非同期コンテキストマネージャーについてのうんちく

このリクエストのコンテキストマネージャーの終了処理(コネクションの解放)は、デストラクタで暗黙的に実行されます [5]
botコードにおいては基本的に無限ループを用いて取引を行うので、デストラクタは頻繁に呼ばれると思われます。その為コネクションの解放はデストラクタに任せてしまってもいいかと考えています。
※クライアントセッションの終了処理は暗黙的に行われないので非同期コンテキストマネージャーを利用しましょう

取引所のAPIは銘柄や金額等のパラメーターを可変で変更する場合が多いのでparams引数を利用しましょう。(f-stringsでもよいと思います)

取引所のAPIは基本的にJSONデータで配信されるのでjsonメソッドで辞書(またはリスト)に変換しましょう。辞書であればキーを指定してprintで表示できます。

printを見やすくするrich

richというライブラリを利用すると辞書(またはリスト)のprintが大変見やすくなります。取引所のAPIを理解しやすくなるのでおすすめです。必要であればこちらのコマンドでインストールしimportを追加してprintを置き換えてみてください。

pip install rich
from rich import print

同期的な非同期処理

さて、非同期処理の基本形を紹介しました。
先ほどのコードは単一のリクエストでしたので非同期通信のメリットが分かりません。
次は非同期処理が得意としそうな複数のリクエストを行うサンプルを紹介します。
BybitのTicker情報で複数(4つ)の銘柄を取得します。

先ほどとの違いは、

  • 5-8行目で「リクエストをし、ボディを取得し、それを返す」非同期通信のコルーチン関数を作成
  • 11行目でsessionをfetch関数で利用するためにglobal宣言しています(これについては単にsession引数を渡す手間を省く私の好みの書き方です)
  • 16-19行目で await文 でfetch関数を実行してreturn値を取得
  • あとは結果を格納してprintで表示したり、実行時間の計測をしています

asyncioを使っているのでこれで非同期に4銘柄を取得できた! と思われるかもしれませんが、それは間違っています。
これは 非同期通信を1つずつ同期的に 行っているだけです。
最後にprintで表示した実行時間を覚えておいてください。(私の環境は0.4sec程)

BybitのTicker情報のAPIはこの様なロジックでなくても、銘柄指定なしで全銘柄の情報を取得できる仕様です。あくまで非同期処理の解説として用いています。

本当の非同期処理

それではようやく本番です。本当の非同期処理を実行してみましょう。

これで4並行でリクエストが行えてます。先ほどより実行時間が速いのが分かりますね。(私の環境は0.1sec程)
1行ずつ解説していきます。

  • 16-19行目でfetch関数の コルーチンオブジェクト を生成します
  • 20-23行目でasyncio.create_taskでコルーチンオブジェクトをTask化しその実行を イベントループ にスケジュールします
  • 24-27行目で await文 でスケジュールした各Taskが終了するのを待機します
  • 28-31行目で各Taskの結果(fetch関数のreturn値)を取り出します

前節との最も重要な違いは20-23行目でコルーチンをTask化し実行を イベントループ にスケジュールし、それを24-27行目で各Taskが終了するのを await文 で待機しているところです。
前節ではイベントループにスケジュールせずに直接await文で実行していました。ここで分かるのは await文は "実行する" というよりも、awaitを行った行で制御を離してイベントループに任せて "awaitしたオブジェクトが終了するまで待機する" という命令 であることです。
なので具体的な処理の流れはこのようになります。

前節

  1. BTCUSDのリクエストをしてレスポンスを待ち、ボディの取得を待つ
  2. 上記が終わったらETHUSDのリクエストをしてレスポンスを待ち、ボディの取得を待つ
  3. 上記が終わったらEOSUSDのリクエストをしてレスポンスを待ち、ボディの取得を待つ
  4. 上記が終わったらXRPUSDのリクエストをしてレスポンスを待ち、ボディの取得を待つ

今回

  1. BTCUSDをfetchするTaskをスケジュールする
  2. ETHUSDをfetchするTaskをスケジュールする
  3. EOSUSDをfetchするTaskをスケジュールする
  4. XRPUSDをfetchするTaskをスケジュールする
  5. スケジュールされたBTCUSDのTaskが終了するまで待機を始める(24行目)
  6. BTCUSDのリクエストを投げてレスポンスが来るまで待機を始める(6行目)
  7. BTCUSDのレスポンスが待ち状態なので、次にスケジュールされているETHUSDのリクエストを投げてレスポンスの待機を始める
  8. もしBTCUSDのレスポンスが来ていたらボディ取得の待機を始める、来ておらず先にETHUSDのレスポンスがきたらボディを取得開始したり次にスケジュールされているEOSUSDのリクエストを投げて待機などをする
  9. BTCUSDのボディ取得まで終わってTaskが終了していたら24行目のawaitが終了する
  10. スケジュールされたETHUSDのTaskが終了するまで待機を始める(25行目)
  11. もしETHUSDのTaskが終了していたら25行目は即終了する 、まだ待ち状態だったらETHUSDのTaskを待ちつつスケジュールされているEOSUSDのボディ取得やXRPUSDのリクエスト・・・などが実行される
  12. 27行目の待機が終わったら最終的に全ての非同期通信が終了する

非同期処理というのはこのように順序関係なく連鎖的に処理が連なるのが本質です。
asyncioが便利なのはcreate_taskでスケジュールしたらイベントループが賢く管理してくれるところです。
レスポンス待機しはじめたら次にスケジュールされているリクエストを投げて先にあっちが終わってたら次はこっち...など自動的にやってくれます。
またシングルスレッドでの動作なのでthreadingやmultiprocessingよりも効率的だしGIL [6] に悩まされることもありません。

また、16-19行目で流れを分解して分かりやすく コルーチンオブジェクト を生成して変数に代入していますが、変数に代入せず20-23行目でtask1 = asyncio.create_task(fetch(...))するのと同義です。
つまり、前節の data1 = await fetch(...) というコードは先ほど書いたように「awaitでコルーチン関数を実行してる」のではなく、「fetch(...)でコルーチンオブジェクトを生成して、それをawaitで終了待機してる」ということがよく分かりますね。

fetch関数の中にprint(f"{url}のリクエストを開始/終了")などを挟んでみると並行具合が分かるので是非試してみてください。

gatherを使おう

先ほどの例で本当に並行な非同期処理が行えましたが、asyncioにはもっと簡潔な書き方があるのでそれを紹介します。

asyncio.gatherを利用しました。処理の中身としては先ほどと全く変わりません(実行時間も変わりません)。
asyncio.gatherはコルーチンオブジェクトを渡してあげるだけで自動でTask化しスケジュールされ全てが終了するまで並行で待機します。return値は引数で渡した順に返ってきます。
非常に便利なのでbotコードにおいて複数リクエストを投げる場合はgatherを利用しましょう。

as_completedを使おう

gatherは「非同期処理が全て終了するまで並行で待機する」機能でした。
次は「非同期処理で早い者勝ちで並行で待機する」機能を紹介します。

asyncio.as_completedを利用しました。gatherと違ってprintで表示されるresultsの中身が順番通りではないことが分かります。(通信具合によっては順番になるかもしれないです)
このサンプルにおいての実行時間はgatherと変わりありません。
24行目以降に早い順で取得したデータに対してさらに何かの非同期処理を行う場合にメリットがあります。gatherで全てが終了したから次の非同期処理をするよりも早くスケジュールを行うことが可能です。

その他の機能

より細かくTaskを並行実行(待機)できるwaitやタイムアウトを設定できるwait_forなどがあります。
また、非同期処理は通信処理(I/Oバウンド)におけるメリットはあってもガンガンCPUを使ってテクニカル指標を計算する処理(CPUバウンド)には適していません。しかしto_threadを使うことによってasyncio内でスレッドの立ち上げを簡単に行えて同じようにスケジュール管理できる機能などもあります。

ここまで学習できた方は是非Pythonのasyncioドキュメントを呼んで理解を深めていきましょう。

https://docs.python.org/ja/3/library/asyncio-task.html

WebSocket

Basic Usage

では次にWebSocket APIを利用していきましょう。WebSocketはREST APIと同じく取引所からデータを取得する手段の1つです。しかしその特性は全く異なります。
REST APIはリクエストをしレスポンスを取得したらサーバーとのコネクションは終了しますが、WebSocketは常にサーバーとのコネクションを維持して通信を行うプロトコルです。
リアルタイムでデータを受信することが可能ですが、その分実装難易度が上がります。

asyncioを利用しない場合、WebSocketのコネクションを維持しつつREST APIで取引を行うには必然的にマルチスレッドが必要となります。しかしながらasyncioの場合は非同期で効率的にWebSocketの受信を行うことが可能なので、やはり非同期処理に優位性があります。

では同じくBybitのWebSocket APIを利用して接続を行ってみましょう。
Instrument情報 (=Ticker)のトピックを購読してみます。

WebSocketは基本的には永続的にデータを受信し続けるので、終了したい場合は Ctrl+C で処理を抜けてください。

  • 6行目の非同期コンテキストマネージャー(ws_connect)でWebSocket接続を行います
  • 7行目のコルーチン関数(ws.send_json)でトピックの購読要求を送信します
  • 8行目の非同期イテレーターで(ws)をイテレーションしてWebSocketメッセージを受信するまで待機します
  • 10行目でJSON形式のデータを辞書に変換しprintで表示します(9-12行目は慣例的な書き方です)
  • 再び8行目のイテレーションでWebSocketメッセージを受信するまで待機します
  • Ctrl+Cを押すとイテレーションから抜け、非同期コンテキストマネージャーの終了処理がされプログラムが終了します

特に難しい部分はないと思いますが不思議なのは async for の部分でしょうか。
これはws変数に入っているaiohttp.ClientWebSocketResponseに実装されている非同期イテレーターです。
行われていることはシンプルで、WebSocketメッセージがあるまで8行目で待機され受信後9行目に移ります。
for文というと有限なループのイメージですが、このaiohttpのWebSocketにおけるasync forは基本的には無限ループです(通信不良やpingの仕様などでは切断されます)。

また先ほどと違いmsg.json()のjsonメソッドが非同期ではありません。
これも単純に、REST APIとWebSocketのプロトコルが違う為コルーチン関数で実装される必要がないからです。
REST APIでは HTTPレスポンス を受信してから HTTPボディ を受信すると説明しました。WebSocketはそのような概念はなく WebSocketメッセージ を受信するだけです。その受信は8行目で終わっているので、jsonメソッドは変換する処理(CPUバウンド)のみなのでコルーチン関数ではない訳です。

WebSocketも本当の非同期処理にする

先ほどのコードでWebSocketの接続ができましたが、イテレーションしてprintしているだけでREST APIによる取引のコードが書けません(※注文の送信は基本的にREST APIのみです)。

前章のREST APIでの同期的に非同期処理をしているようなものなので、WebSocketコネクションをTask化して本当の非同期処理にしてみましょう。
あまり意味のあるロジックではないですが、WebSocketでInstrumentを購読しつつREST APIでTicker情報を取得しみます。

  • 4-11行目で先ほどのWebSocketの処理をコルーチン関数(run_forever)として定義しました
  • 16行目でそれをTask化して実行をスケジュールします
  • 17行目でTicker情報の非同期リクエストが投げられレスポンスの待機を開始します
  • スケジュールされているrun_foreverのTaskが開始されWebSocketのレスポンスの待機を開始します
  • Ticker情報のレスポンスが早ければ18行目に移り、WebSocketのレスポンスが早ければトピック購読の非同期処理(6行目)に移ります
  • Ticker情報(HTTPボディ)が取得されたら19-21行目でprintで表示されます(WebSocket側もprintしているので見やすいようにしています)
  • 22行目でWebSocketのTask終了まで待機します(無限ループなので実質的に終了しません)
  • Ctrl+Cを入力するとWebSocketのTaskから抜けてプログラムが終了します

こちらも前章を理解して頂いていたらそれほど難しくないと思います。
16行目のTask化を増やせば複数のWebSocketに接続することができますね。

ちなみに22行目のawait wstaskのようにWebSocketの無限ループTaskの終了待機をすることはbotコードにおいては基本的にはありません。while Trueでbotロジックを記述していくことになりますね。

ここまで分かれば非同期通信を用いたbotを作成できるまであとわずかです!

非同期bot作成への道

API認証という障壁

REST APIとWebSocketの章を理解したあなたは比較的簡単に非同期処理を書けるようになっていることでしょう。

しかし実際にトレードを行うには プライベートAPI (注文など)をリクエストするので API認証 が必要です。これまでの説明は全てパブリックAPIを利用していました。
API認証はリクエストヘッダーを編集したり、クエリー文字列を編集したりするなどの処理を追加してやる必要があります。

aiohttpを用いてBybitで注文を送信するには、具体的にこのような処理が必要になります(市場の300ドル下に1ドルで買い指値を入れるコードです、実行する場合は7行目を書き換えてください)。

実際に注文が入ります!
テストには残高が必要です。テスト後のキャンセル忘れなどご注意ください。

16-19行目が認証ロジックです。
BybitだけでもGETとPOSTリクエスト、さらにはWebSocket認証の3種類が存在します。それが取引所ごとに異なっており、リクエスト毎にこの処理が必要です。

botコードにおいてこのような認証ロジックは共通関数等にモジュール化しなければ、取引ロジックを集中して書けないでしょう。
認証を解決してくれるライブラリとしてccxt [7] があります。ccxtは認証を自動でやってくれて一応非同期処理も可能です。
しかし、ここまで説明してきたようなエンドポイント(URL)を入力してリクエストする形式ではなく、data = bybit.fetchTicker(symbol="BTC/USD")と独自メソッドを利用する形式となっており、私はあまり好きではありません。
一番の問題としては WebSocketが有料でしか利用できません

そこでどうしても非同期処理でbotを組みたかった私は aiohttpベースで自動認証ロジックを搭載したライブラリを開発しました

pybotters

Twitter等の拡散で読んでいる方の多くはご存じだったかもしれませんね😂
しかしZenn初投稿ですので初めて知った方や、知っていたが非同期は難しそうで利用できずにいた方は、これまでの知識で十分に活用できると思います。

https://github.com/MtkN1/pybotters

pybottersは上記のような認証の煩雑さを解決したaiohttpベースのライブラリです。
記事執筆現在、以下の取引所のAPI認証に対応しています。

  • Bybit
  • Binance
  • FTX
  • Phemex
  • BitMEX
  • bitFlyer
  • GMO Coin
  • Liquid
  • bitbank
  • Coincheck

さらにpybottersはaiohttpベースというだけではなく、bot開発をしやすいように痒い所に手が届くような機能を追加しています。

  • PyPIに登録しているのでpip install pybottersでインストール可能
  • API認証はホスト名から自動で行う為、共通関数さえ呼ぶ必要がない
  • REST APIのベースURLを設定可能
  • WebSocketの接続は自動でTask化を行う
  • WebSocketの切断対策のロジックを搭載、自動的に再接続を行う
  • WebSocketのデータをハンドリングして扱いやすい形式で保管する(DataStore)
  • 型ヒントなどの実装

では先ほどの注文コードをpybottersで書き換えるとこうなります。

testnetを利用する場合は、4行目の辞書キー名をbybit_testnetに書き換えてください。
詳しくはpybottersドキュメント参照。

ほぼ不要なコードがなくなったと思いませんか?
WebSocketに関してはこれだけで接続できます!

WebSocket章のロジックが組み込まれているので、説明した内容はほぼほぼ不要になってしまってますね笑

pybottersを利用すれば、WebSocketに接続してリアルタイムでデータを溜め込み、それを元に注文を出す 高頻度取引bot も手軽に実装できると思います。
さらに非同期処理の力を生かせば他のbotterの一歩先に立てるのではないでしょうか?
記事の前半の例ではTicker情報のリクエストを例にサンプルコードを記載しましたが、それをpybottersで注文のリクエストに置き換えるだけです!

上記GitHubリポジトリのWikiには使い方の情報がまとまっているので是非ご利用・ご覧ください!

おわりに

今回は非同期処理に関する記事をZennに初めて投稿させて頂きました。
(hoheto氏の投稿 [8] に触発されました)
いつもはnoteに投稿していることもあり、こちらは未開拓なので是非拡散・Zennのフォローして頂けると幸いです。

Twitterではbot開発に関する情報を発信していますので是非フォローお願いします🙇‍♂️

https://twitter.com/MtkN1XBt

GitHubリポジトリにStar頂けると励みになります🙇‍♂️
またオープンソースプロジェクトですのでどなたでも開発に参加頂けます🙇‍♂️

https://github.com/MtkN1/pybotters

議論が活発なDiscordコミュニティもあるので是非ご参加ください🙇‍♂️

https://discord.com/invite/CxuWSX9U69

noteでも情報発信しているのでフォロー&スキ頂けると幸いです🙇‍♂️

https://note.com/mtkn1
脚注
  1. https://github.com/aio-libs/aiohttp/issues/4324 ↩︎
  2. https://docs.python.org/ja/3/library/asyncio-task.html#coroutines ↩︎
  3. https://developer.mozilla.org/ja/docs/Web/HTTP/Session ↩︎
  4. https://developer.mozilla.org/ja/docs/Web/HTTP/Messages ↩︎
  5. https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_reqrep.py#L737 ↩︎
  6. https://docs.python.org/ja/3/glossary.html#term-global-interpreter-lock ↩︎
  7. https://github.com/ccxt/ccxt ↩︎
  8. https://zenn.dev/hoheto/articles/e27fb00d3ccbd1 ↩︎

この記事に贈られたバッジ

Discussion

はじめまして。非常に勉強になる記事をありがとうございます。
写経させて頂いていたのですが、order2.pyの注文を出すところで、以下エラーメッセージが解消できません。接続先はtestnet, pybottersのバージョンは0.6.0, 環境はWindowsです。order1.pyでは注文を出せているのでAPIキーは正しく、しつこく見直したのでタイポも無いはずですが。。。エラーの原因について、もし分かるようでしたら教えて頂けないでしょうか。

{'ret_code': 10001, 'ret_msg': 'empty param of timestamp', 'ext_code': '', 'ext_info': '', 'result': None, 'time_now': '1631634790.108881'}

上の質問について、ドキュメントを見て自己解決しました。※apisのキーを"bybit_testnet"に変更。
お騒がせしました。

コメントありがとうございます!
自己解決されていてよかったです👍 testnetで試される方も多いかもしれないので、この旨の補足を追記させて頂きますね。

ログインするとコメントできます