📘

mmbot作って動かしたけど勝てる気がしないので、コードを公開して一旦諦める

2022/06/12に公開約16,500字

最近mmbotを作ってみて動かしてみたのですが、全く勝てる気がしないので、書いたコードを公開して一旦closeしたい(=一旦諦めたい)と思います。
こちらのコードのロジックを参考にしつつ、pybottersを使ってwebsocketを使ってみたり、という感じです。

https://note.com/magimagi1223/n/n5fba7501dcfd

誰に役立つか

  • 仮想通貨の取引botを作ってみたいけど、何から手をつけたら良いかわからない人
  • pybotters で websocket とかどうやって使うん?(めっちゃ具体的w)
  • influxdbもどうやって使うん?
  • 将来また挑戦したときの筆者

免責事項

  • こちらのコードの動作は保証できません
  • 一旦動かすことを目的にしていたので、バグ取りも終わっていません。(profitsの計算とか間違ってるかも)
  • このコードを参考にしたことから生じたいかなる損害に対しても、筆者は一切の責任を負いかねます。

感想

  • mmbotの大まかな流れは、少し掴むことができた
  • pythonの勉強になった
  • 自分への宿題
    • influxdbのtimestampの指定方法が結局いまいちわかってない

環境

  • python 3.9.9
    • pybotters
  • influxdb
  • bitflyer 現物 JPY/BTC

コード

.
├── bots
│   ├── __init__.py
│   ├── bot.py
│   └── mmbot_20220429.py
├── lib
│   ├── __init__.py
│   ├── db_wrapper.py
│   └── utils.py
└── main.py

main.py

import asyncio
from curses.ascii import DEL
from multiprocessing.connection import wait
from datetime import timezone
from dotenv import load_dotenv
import logging
import datetime
import os
import json
import copy
import numpy

import pybotters


from lib.db_wrapper import DbWrapper
from bots.mmbot_20220429 import Mmbot20220429
from lib.utils import format_board, get_board_diff, sort_executions_by_exec_date, sort_board_by_price, board_average

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

load_dotenv()

# log
logger = logging.getLogger('LoggingTest')
logger.setLevel(10)
fh = logging.FileHandler('./logs/log_mm_bf_' + datetime.datetime.now().strftime(
    '%Y%m%d') + '_' + datetime.datetime.now().strftime('%H%M%S') + '.log')
logger.addHandler(fh)
sh = logging.StreamHandler()
logger.addHandler(sh)
formatter = logging.Formatter(
    '%(asctime)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S")
fh.setFormatter(formatter)
sh.setFormatter(formatter)

# apis
apis = {
    'bitflyer': [os.environ.get("BF_API_KEY"), os.environ.get("BF_API_SECRET")]
}

async def main():
    async with pybotters.Client(apis=apis, base_url='https://api.bitflyer.com') as api_client:
        async with InfluxDBClientAsync(url=os.environ.get("INFLUX_URL"), org=os.environ.get("INFLUX_ORG"), token=os.environ.get("INFLUX_TOKEN")) as db_client:
            db_wrapper = DbWrapper(db_client, "mmbot20220429")

            public_channels = [
                'lightning_board_snapshot_BTC_JPY', 'lightning_board_BTC_JPY', 'lightning_executions_BTC_JPY']
            private_channels = ['child_order_events', 'parent_order_events']

            store = pybotters.bitFlyerDataStore()
            params = [{'method': 'subscribe', 'params': {'channel': c}}
                      for c in [*public_channels, *private_channels]]
            await api_client.ws_connect(
                'wss://ws.lightstream.bitflyer.com/json-rpc',
                send_str=json.dumps(params),
                hdlr_json=store.onmessage
            )

            # bots
            mmbot1 = Mmbot20220429(api_client, logger, db_wrapper, store)

            while True:
                await store.wait()
                buy_board = copy.deepcopy(store.board.find({'side': 'BUY'})) ## BUYのorderを取得
                buy_board = list(map(lambda o: format_board(o), buy_board))
                sorted_buy_board = sort_board_by_price(buy_board, True)

                sell_board = copy.deepcopy(store.board.find({'side': 'SELL'})) ## sellのorderを取得
                sell_board = list(map(lambda o: format_board(o), sell_board))
                sorted_sell_board = sort_board_by_price(sell_board, False)

                if len(sorted_buy_board) == 0 or len(sorted_sell_board) == 0:
                    continue

                executions = store.executions.find() ## 約定履歴を取得
                if len(executions) == 0:
                    continue

                executions = sort_executions_by_exec_date(executions, True)
                price = executions[0]['price']

                await asyncio.gather(*[
                    mmbot1.logic(sorted_buy_board, sorted_sell_board, price)
                ])
                await asyncio.sleep(1)


if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

utils.py

import numpy


def format_board(order):
    """delete  side and product_code and price * side"""
    del order['side']
    del order['product_code']
    order['sipr'] = order['price'] * order['size']
    return order


def sort_board_by_price(board, is_descending):
    """sort board by price"""
    return sorted(board, key=lambda o: o['price'], reverse=is_descending)


def sort_executions_by_exec_date(executions, is_descending):
    """sort executions by exec_date"""
    return sorted(executions, key=lambda o: o['exec_date'], reverse=is_descending)

bot.py

import time

from pybotters import bitFlyerDataStore

from lib.db_wrapper import DbWrapper


class Bot:
    def __init__(self, client, logger, db: DbWrapper, store: bitFlyerDataStore):
        self.client = client
        self.logger = logger
        self.db = db
        self.store = store

        self.status = {
            "jpy": 0,
            "btc": 0,
            "profits": 0,
            "price": 0,
        }
        self.active_orders = []

    async def send_order(self, side, size, price):
        try:
            resp = await self.client.post('/v1/me/sendchildorder', data={
                "product_code": "BTC_JPY",
                "child_order_type": "LIMIT",
                "side": side,
                "price": price,
                "size": size,
                "time_in_force": "GTC"
            })
            value = await resp.json()
            if value['child_order_acceptance_id'] != None:
                await self.db.write(
                    value['child_order_acceptance_id'], side, size, price)
        except Exception as e:
            print('send_order error', e)
        return value

    async def get_status(self, ids=[]):
        while True:
            try:
                resp = await self.client.get('/v1/me/getchildorders', params={
                    "product_code": "BTC_JPY",
                    "child_order_acceptance_id": ids
                })
                value = await resp.json()
                value = value[0]
                break
            except Exception as e:
                self.logger.info(e)
                time.sleep(2)

        # APIで受け取った値を読み換える
        if value['child_order_state'] == 'ACTIVE':
            status = 'open'
        elif value['child_order_state'] == 'COMPLETED':
            status = 'closed'
            self.db.update_is_completed(id)
        else:
            status = value['child_order_state']

        # 未約定量を計算する
        remaining = float(value['size']) - float(value['executed_size'])

        time.sleep(0.1)
        return {'child_order_acceptance_id': value['child_order_acceptance_id'], 'status': status, 'filled': value['executed_size'], 'remaining': remaining, 'size': value['size'], 'price': value['price']}

    def get_active_order_ids(self):
        return list(map(lambda t: t['child_order_acceptance_id']), [
            * self.status['buy_active_orders'], *
            self.status['sell_active_orders']
        ])

    def get_active_order_ids(self):
        return list(map(lambda o: o['child_order_acceptance_id'], self.active_orders))

    def get_store_bot_orders(self):
        orders = self.store.childorders.find()
        return list(
            filter(lambda o: o['child_order_acceptance_id'] in self.get_active_order_ids(), orders))

    def get_total_assets(self):
        return self.status['jpy'] + self.status['btc'] * self.status['price']

    def get_store_not_active_order_ids(self):
        not_active_types = ['CANCEL', 'COMPLETE', 'ORDER_FAILED', 'EXPIRE']
        not_active_orders = list(filter(
            lambda t: t['event_type'] in not_active_types, self.get_store_bot_orders()))
        return list(map(lambda o: o['child_order_acceptance_id'], not_active_orders))

    def get_execution_orders(self):
        orders = self.store.childorderevents.find({'event_type': 'EXECUTION'})
        return list(
            filter(lambda o: o['child_order_acceptance_id'] in self.get_active_order_ids(), orders))

    async def update_status(self, price):
        completed = self.get_execution_orders()
        commission_rate = 0.0015  # [btc / btc]
        completed_buy = list(filter(lambda t: t['side'] == 'BUY', completed))
        buy_btc = sum(list(map(lambda t: t['size'], completed_buy)))
        buy_used_jpy = sum(list(
            map(lambda t: t['price'] * t['size'] * (1 + commission_rate), completed_buy)))

        completed_sell = list(filter(lambda t: t['side'] == 'SELL', completed))
        sell_jpy = sum(
            list(map(lambda t: t['price'] * t['size'], completed_buy)))
        sell_used_btc = sum(
            list(map(lambda t: t['size'] * (1 + commission_rate), completed_sell)))
        now_btc = buy_btc - sell_used_btc
        now_jpy = sell_jpy - buy_used_jpy

        profits = self.status['profits'] + now_jpy + now_btc * price

        self.active_orders = self.get_store_bot_orders()

        self.status = {
            'jpy': now_jpy,
            'btc': now_btc,
            'price': price,
            "profits": profits
        }

        await self.db.write_status(self.status, self.get_total_assets())

    def remove_active_order_by_id(self, id):
        self.active_orders = list(
            filter(lambda t: t['child_order_acceptance_id'] != id, self.active_orders))

    async def cancel(self, id):
        try:
            resp = await self.client.post('/v1/me/cancelchildorder', data={
                "product_code": "BTC_JPY",
                "child_order_acceptance_id": id
            })
            self.remove_active_order_by_id(id)
        except Exception as e:
            self.logger.info(e)
            value = await self.get_status(id)

        time.sleep(0.5)
        return value

    def active_sell_orders(self):
        return list(filter(lambda o: o['side'] == 'SELL', self.active_orders))

    def active_buy_orders(self):
        return list(filter(lambda o: o['side'] == 'BUY', self.active_orders))

wb_wrapper.py

  • influxdbの書き込みなど
  • typeのconflictが発生するのを避けるためfloat()にした
from influxdb_client import Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import timezone
from dotenv import load_dotenv
import os
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync


class DbWrapper:
    def __init__(self, db_client, bot_name):
        self.db_client = db_client
        self.bot_name = bot_name
        self.write_api = self.db_client.write_api()

    async def write(self, order_id, side, size, price):
        _point1 = Point(self.bot_name).tag(
            "record_type", "order").field('order_id', order_id).field("side", side).field("size", float(size)).field("price", float(price)).field("all", float(size*price))
        successfully = await self.write_api.write(bucket="trade", record=[_point1])

    async def write_status(self, status, total):
        _point = Point(self.bot_name).tag(
            "record_type", "assets").field("btc", float(status["btc"])).field("jpy", float(status["jpy"])).field(
                "price", float(status["price"])).field("profits", float(status["profits"]))

        await self.write_api.write(bucket="trade", record=[_point])

mmbot_20220429.py

import asyncio
import time

from .bot import Bot

#------------------------------------------------------------------------------#
# parameters
SIZE_MARGIN = 0.01
LOT = 0.001
PRICE_DELTA = 1
SPREAD_ENTRY_TH = 0.0005
SPREAD_CANCEL_TH = 0.0005
MAX_POSITION = 0.002  # BTC
#------------------------------------------------------------------------------#


class Mmbot20220429(Bot):
    def __init__(self, client, logger, db, store):
        super().__init__(client, logger, db, store)

        # initialize
        self.pos_amount = 0

    def pos(self):
        if len(self.active_orders) == 0:
            return 'none'
        if len(self.active_buy_orders()) != 0:
            return 'entry_buy'
        if len(self.active_sell_orders()) != 0:
            return 'entry_sell'
        return 'entry'

    def get_effective_price(self, ask_board, bid_board, ask_trades, bid_trades, size_margin):
        # ask (buy)
        size = 0
        ask_index = 0
        while size <= size_margin and ask_index < len(ask_board):
            size += ask_board[ask_index]['size']

            for trade in ask_trades:
                if ask_board[ask_index]['size'] == trade['price']:
                    size -= trade['size']
            ask_index += 1

        # bid (sell)
        size = 0
        bid_index = 0
        while size <= size_margin and bid_index < len(bid_board):
            size += bid_board[bid_index]['size']

            for trade in bid_trades:
                if bid_board[bid_index]['size'] == trade['price']:
                    size -= trade['size']
            bid_index += 1

        return ask_board[ask_index - 1]['price'], bid_board[bid_index - 1]['price']

    def get_order_size(self, position, max_position):
        # buy_order_size
        buy_order_size = 0
        if position >= max_position:
            buy_order_size = 0
        elif position < max_position:
            buy_order_size = max_position - \
                (max_position/4) * ((position+max_position)/max_position)**2
        elif position <= -max_position:
            buy_order_size = max_position

        # sell_order_size
        sell_order_size = 0
        if position >= max_position:
            sell_order_size = max_position
        elif position < max_position:
            sell_order_size = (max_position/4) * \
                ((position+max_position)/max_position)**2
        elif position <= -1 * max_position:
            sell_order_size = 0
        return max(round(buy_order_size, 7), 0.001), max(round(sell_order_size, 7), 0.001)

    async def logic(self, sell_board, buy_board, price):
        await self.update_status(price)

        ask, bid = self.get_effective_price(
            buy_board, sell_board, [], [], SIZE_MARGIN)
        spread = (ask - bid) / bid

        if self.pos() == 'none':
            print('loop pos none')

            if spread > SPREAD_ENTRY_TH:
                buy_order_size, sell_order_size = self.get_order_size(
                    self.pos_amount, MAX_POSITION)
                order_price = ask - PRICE_DELTA
                sell_order = await self.send_order('SELL', sell_order_size, order_price)
                if 'error_message' not in sell_order:
                    sell_order = {
                        'child_order_acceptance_id': sell_order['child_order_acceptance_id'],
                        'side': 'SELL',
                        'size': sell_order_size,
                        'price': order_price,
                        'timestamp': int(time.time()),
                        'status': 'open',
                        'outstanding_size': sell_order_size
                    }
                    self.active_orders.insert(0, sell_order)

                order_price = bid + PRICE_DELTA
                buy_order = await self.send_order('BUY', buy_order_size, order_price)
                if 'error_message' not in buy_order:
                    buy_order = {
                        'child_order_acceptance_id': buy_order['child_order_acceptance_id'],
                        'side': 'BUY',
                        'size': buy_order_size,
                        'price': order_price,
                        'timestamp': int(time.time()),
                        'status': 'open',
                        'outstanding_size': buy_order_size
                    }
                    self.active_orders.insert(0, buy_order)

        pos = self.pos()
        if pos == 'entry' or pos == 'entry_buy' or pos == 'entry_sell':

            if len(self.active_sell_orders()) > 0:
                sell_order = self.active_sell_orders()
                sell_order = sell_order[0]
                if 'outstanding_size' in sell_order and sell_order['outstanding_size'] < 0.001:
                    await self.cancel(sell_order['child_order_acceptance_id'])

            if len(self.active_buy_orders()) > 0:
                buy_order = self.active_buy_orders()
                buy_order = buy_order[0]
                if 'outstanding_size' in buy_order and buy_order['outstanding_size'] < 0.001:
                    await self.cancel(buy_order['child_order_acceptance_id'])
        await asyncio.sleep(0.5)

Discussion

ログインするとコメントできます