📘
mmbot作って動かしたけど勝てる気がしないので、コードを公開して一旦諦める
最近mmbotを作ってみて動かしてみたのですが、全く勝てる気がしないので、書いたコードを公開して一旦closeしたい(=一旦諦めたい)と思います。
こちらのコードのロジックを参考にしつつ、pybottersを使ってwebsocketを使ってみたり、という感じです。
誰に役立つか
- 仮想通貨の取引botを作ってみたいけど、何から手をつけたら良いかわからない人
- pybotters で websocket とかどうやって使うん?(めっちゃ具体的w)
- influxdbもどうやって使うん?
- 将来また挑戦したときの筆者
免責事項
- こちらのコードの動作は保証できません
- 一旦動かすことを目的にしていたので、バグ取りも終わっていません。(profitsの計算とか間違ってるかも)
- このコードを参考にしたことから生じたいかなる損害に対しても、筆者は一切の責任を負いかねます。
感想
- mmbotの大まかな流れは、少し掴むことができた
- pythonの勉強になった
- 自分への宿題
- influxdbのtimestampの指定方法が結局いまいちわかってない
環境
- python 3.9.9
- pybotters
- influxdb
- bitflyer 現物 JPY/BTC
コード
.
├── bots
│ ├── __init__.py
│ ├── bot.py
│ └── mmbot_20220429.py
├── lib
│ ├── __init__.py
│ ├── db_wrapper.py
│ └── utils.py
└── main.py
main.py
import asyncio
from curses.ascii import DEL
from multiprocessing.connection import wait
from datetime import timezone
from dotenv import load_dotenv
import logging
import datetime
import os
import json
import copy
import numpy
import pybotters
from lib.db_wrapper import DbWrapper
from bots.mmbot_20220429 import Mmbot20220429
from lib.utils import format_board, get_board_diff, sort_executions_by_exec_date, sort_board_by_price, board_average
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
load_dotenv()
# log
logger = logging.getLogger('LoggingTest')
logger.setLevel(10)
fh = logging.FileHandler('./logs/log_mm_bf_' + datetime.datetime.now().strftime(
'%Y%m%d') + '_' + datetime.datetime.now().strftime('%H%M%S') + '.log')
logger.addHandler(fh)
sh = logging.StreamHandler()
logger.addHandler(sh)
formatter = logging.Formatter(
'%(asctime)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S")
fh.setFormatter(formatter)
sh.setFormatter(formatter)
# apis
apis = {
'bitflyer': [os.environ.get("BF_API_KEY"), os.environ.get("BF_API_SECRET")]
}
async def main():
async with pybotters.Client(apis=apis, base_url='https://api.bitflyer.com') as api_client:
async with InfluxDBClientAsync(url=os.environ.get("INFLUX_URL"), org=os.environ.get("INFLUX_ORG"), token=os.environ.get("INFLUX_TOKEN")) as db_client:
db_wrapper = DbWrapper(db_client, "mmbot20220429")
public_channels = [
'lightning_board_snapshot_BTC_JPY', 'lightning_board_BTC_JPY', 'lightning_executions_BTC_JPY']
private_channels = ['child_order_events', 'parent_order_events']
store = pybotters.bitFlyerDataStore()
params = [{'method': 'subscribe', 'params': {'channel': c}}
for c in [*public_channels, *private_channels]]
await api_client.ws_connect(
'wss://ws.lightstream.bitflyer.com/json-rpc',
send_str=json.dumps(params),
hdlr_json=store.onmessage
)
# bots
mmbot1 = Mmbot20220429(api_client, logger, db_wrapper, store)
while True:
await store.wait()
buy_board = copy.deepcopy(store.board.find({'side': 'BUY'})) ## BUYのorderを取得
buy_board = list(map(lambda o: format_board(o), buy_board))
sorted_buy_board = sort_board_by_price(buy_board, True)
sell_board = copy.deepcopy(store.board.find({'side': 'SELL'})) ## sellのorderを取得
sell_board = list(map(lambda o: format_board(o), sell_board))
sorted_sell_board = sort_board_by_price(sell_board, False)
if len(sorted_buy_board) == 0 or len(sorted_sell_board) == 0:
continue
executions = store.executions.find() ## 約定履歴を取得
if len(executions) == 0:
continue
executions = sort_executions_by_exec_date(executions, True)
price = executions[0]['price']
await asyncio.gather(*[
mmbot1.logic(sorted_buy_board, sorted_sell_board, price)
])
await asyncio.sleep(1)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
utils.py
import numpy
def format_board(order):
"""delete side and product_code and price * side"""
del order['side']
del order['product_code']
order['sipr'] = order['price'] * order['size']
return order
def sort_board_by_price(board, is_descending):
"""sort board by price"""
return sorted(board, key=lambda o: o['price'], reverse=is_descending)
def sort_executions_by_exec_date(executions, is_descending):
"""sort executions by exec_date"""
return sorted(executions, key=lambda o: o['exec_date'], reverse=is_descending)
bot.py
import time
from pybotters import bitFlyerDataStore
from lib.db_wrapper import DbWrapper
class Bot:
def __init__(self, client, logger, db: DbWrapper, store: bitFlyerDataStore):
self.client = client
self.logger = logger
self.db = db
self.store = store
self.status = {
"jpy": 0,
"btc": 0,
"profits": 0,
"price": 0,
}
self.active_orders = []
async def send_order(self, side, size, price):
try:
resp = await self.client.post('/v1/me/sendchildorder', data={
"product_code": "BTC_JPY",
"child_order_type": "LIMIT",
"side": side,
"price": price,
"size": size,
"time_in_force": "GTC"
})
value = await resp.json()
if value['child_order_acceptance_id'] != None:
await self.db.write(
value['child_order_acceptance_id'], side, size, price)
except Exception as e:
print('send_order error', e)
return value
async def get_status(self, ids=[]):
while True:
try:
resp = await self.client.get('/v1/me/getchildorders', params={
"product_code": "BTC_JPY",
"child_order_acceptance_id": ids
})
value = await resp.json()
value = value[0]
break
except Exception as e:
self.logger.info(e)
time.sleep(2)
# APIで受け取った値を読み換える
if value['child_order_state'] == 'ACTIVE':
status = 'open'
elif value['child_order_state'] == 'COMPLETED':
status = 'closed'
self.db.update_is_completed(id)
else:
status = value['child_order_state']
# 未約定量を計算する
remaining = float(value['size']) - float(value['executed_size'])
time.sleep(0.1)
return {'child_order_acceptance_id': value['child_order_acceptance_id'], 'status': status, 'filled': value['executed_size'], 'remaining': remaining, 'size': value['size'], 'price': value['price']}
def get_active_order_ids(self):
return list(map(lambda t: t['child_order_acceptance_id']), [
* self.status['buy_active_orders'], *
self.status['sell_active_orders']
])
def get_active_order_ids(self):
return list(map(lambda o: o['child_order_acceptance_id'], self.active_orders))
def get_store_bot_orders(self):
orders = self.store.childorders.find()
return list(
filter(lambda o: o['child_order_acceptance_id'] in self.get_active_order_ids(), orders))
def get_total_assets(self):
return self.status['jpy'] + self.status['btc'] * self.status['price']
def get_store_not_active_order_ids(self):
not_active_types = ['CANCEL', 'COMPLETE', 'ORDER_FAILED', 'EXPIRE']
not_active_orders = list(filter(
lambda t: t['event_type'] in not_active_types, self.get_store_bot_orders()))
return list(map(lambda o: o['child_order_acceptance_id'], not_active_orders))
def get_execution_orders(self):
orders = self.store.childorderevents.find({'event_type': 'EXECUTION'})
return list(
filter(lambda o: o['child_order_acceptance_id'] in self.get_active_order_ids(), orders))
async def update_status(self, price):
completed = self.get_execution_orders()
commission_rate = 0.0015 # [btc / btc]
completed_buy = list(filter(lambda t: t['side'] == 'BUY', completed))
buy_btc = sum(list(map(lambda t: t['size'], completed_buy)))
buy_used_jpy = sum(list(
map(lambda t: t['price'] * t['size'] * (1 + commission_rate), completed_buy)))
completed_sell = list(filter(lambda t: t['side'] == 'SELL', completed))
sell_jpy = sum(
list(map(lambda t: t['price'] * t['size'], completed_buy)))
sell_used_btc = sum(
list(map(lambda t: t['size'] * (1 + commission_rate), completed_sell)))
now_btc = buy_btc - sell_used_btc
now_jpy = sell_jpy - buy_used_jpy
profits = self.status['profits'] + now_jpy + now_btc * price
self.active_orders = self.get_store_bot_orders()
self.status = {
'jpy': now_jpy,
'btc': now_btc,
'price': price,
"profits": profits
}
await self.db.write_status(self.status, self.get_total_assets())
def remove_active_order_by_id(self, id):
self.active_orders = list(
filter(lambda t: t['child_order_acceptance_id'] != id, self.active_orders))
async def cancel(self, id):
try:
resp = await self.client.post('/v1/me/cancelchildorder', data={
"product_code": "BTC_JPY",
"child_order_acceptance_id": id
})
self.remove_active_order_by_id(id)
except Exception as e:
self.logger.info(e)
value = await self.get_status(id)
time.sleep(0.5)
return value
def active_sell_orders(self):
return list(filter(lambda o: o['side'] == 'SELL', self.active_orders))
def active_buy_orders(self):
return list(filter(lambda o: o['side'] == 'BUY', self.active_orders))
wb_wrapper.py
- influxdbの書き込みなど
- typeのconflictが発生するのを避けるためfloat()にした
from influxdb_client import Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import timezone
from dotenv import load_dotenv
import os
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
class DbWrapper:
def __init__(self, db_client, bot_name):
self.db_client = db_client
self.bot_name = bot_name
self.write_api = self.db_client.write_api()
async def write(self, order_id, side, size, price):
_point1 = Point(self.bot_name).tag(
"record_type", "order").field('order_id', order_id).field("side", side).field("size", float(size)).field("price", float(price)).field("all", float(size*price))
successfully = await self.write_api.write(bucket="trade", record=[_point1])
async def write_status(self, status, total):
_point = Point(self.bot_name).tag(
"record_type", "assets").field("btc", float(status["btc"])).field("jpy", float(status["jpy"])).field(
"price", float(status["price"])).field("profits", float(status["profits"]))
await self.write_api.write(bucket="trade", record=[_point])
mmbot_20220429.py
import asyncio
import time
from .bot import Bot
#------------------------------------------------------------------------------#
# parameters
SIZE_MARGIN = 0.01
LOT = 0.001
PRICE_DELTA = 1
SPREAD_ENTRY_TH = 0.0005
SPREAD_CANCEL_TH = 0.0005
MAX_POSITION = 0.002 # BTC
#------------------------------------------------------------------------------#
class Mmbot20220429(Bot):
def __init__(self, client, logger, db, store):
super().__init__(client, logger, db, store)
# initialize
self.pos_amount = 0
def pos(self):
if len(self.active_orders) == 0:
return 'none'
if len(self.active_buy_orders()) != 0:
return 'entry_buy'
if len(self.active_sell_orders()) != 0:
return 'entry_sell'
return 'entry'
def get_effective_price(self, ask_board, bid_board, ask_trades, bid_trades, size_margin):
# ask (buy)
size = 0
ask_index = 0
while size <= size_margin and ask_index < len(ask_board):
size += ask_board[ask_index]['size']
for trade in ask_trades:
if ask_board[ask_index]['size'] == trade['price']:
size -= trade['size']
ask_index += 1
# bid (sell)
size = 0
bid_index = 0
while size <= size_margin and bid_index < len(bid_board):
size += bid_board[bid_index]['size']
for trade in bid_trades:
if bid_board[bid_index]['size'] == trade['price']:
size -= trade['size']
bid_index += 1
return ask_board[ask_index - 1]['price'], bid_board[bid_index - 1]['price']
def get_order_size(self, position, max_position):
# buy_order_size
buy_order_size = 0
if position >= max_position:
buy_order_size = 0
elif position < max_position:
buy_order_size = max_position - \
(max_position/4) * ((position+max_position)/max_position)**2
elif position <= -max_position:
buy_order_size = max_position
# sell_order_size
sell_order_size = 0
if position >= max_position:
sell_order_size = max_position
elif position < max_position:
sell_order_size = (max_position/4) * \
((position+max_position)/max_position)**2
elif position <= -1 * max_position:
sell_order_size = 0
return max(round(buy_order_size, 7), 0.001), max(round(sell_order_size, 7), 0.001)
async def logic(self, sell_board, buy_board, price):
await self.update_status(price)
ask, bid = self.get_effective_price(
buy_board, sell_board, [], [], SIZE_MARGIN)
spread = (ask - bid) / bid
if self.pos() == 'none':
print('loop pos none')
if spread > SPREAD_ENTRY_TH:
buy_order_size, sell_order_size = self.get_order_size(
self.pos_amount, MAX_POSITION)
order_price = ask - PRICE_DELTA
sell_order = await self.send_order('SELL', sell_order_size, order_price)
if 'error_message' not in sell_order:
sell_order = {
'child_order_acceptance_id': sell_order['child_order_acceptance_id'],
'side': 'SELL',
'size': sell_order_size,
'price': order_price,
'timestamp': int(time.time()),
'status': 'open',
'outstanding_size': sell_order_size
}
self.active_orders.insert(0, sell_order)
order_price = bid + PRICE_DELTA
buy_order = await self.send_order('BUY', buy_order_size, order_price)
if 'error_message' not in buy_order:
buy_order = {
'child_order_acceptance_id': buy_order['child_order_acceptance_id'],
'side': 'BUY',
'size': buy_order_size,
'price': order_price,
'timestamp': int(time.time()),
'status': 'open',
'outstanding_size': buy_order_size
}
self.active_orders.insert(0, buy_order)
pos = self.pos()
if pos == 'entry' or pos == 'entry_buy' or pos == 'entry_sell':
if len(self.active_sell_orders()) > 0:
sell_order = self.active_sell_orders()
sell_order = sell_order[0]
if 'outstanding_size' in sell_order and sell_order['outstanding_size'] < 0.001:
await self.cancel(sell_order['child_order_acceptance_id'])
if len(self.active_buy_orders()) > 0:
buy_order = self.active_buy_orders()
buy_order = buy_order[0]
if 'outstanding_size' in buy_order and buy_order['outstanding_size'] < 0.001:
await self.cancel(buy_order['child_order_acceptance_id'])
await asyncio.sleep(0.5)
Discussion