SQLAlchemyでmaster/slaveの切り替え
RDBを使用していて、アクセス数が増えて負荷がかかった際の対処法としては、更新用と参照用でRDBを分けるレプリケーションというものがあります。
PythonのORMであるSQLAlchemyを使用していて、ORMからRDBにアクセスする際には、参照用と更新用に分けたいという状況もあると思います。
RDBのレプリケーションを実施した際に、SQLAlchemyに更新用と参照用を切り替えるための処理の追加方法を紹介します。
レプリケーションとは
ここでレプリケーションを知らない方のために簡単に解説します。
MySQL/PostgresqlといったRDBを運用していて、アクセス数などが増大していけば、システムの負荷が高くなり、パフォーマンスが低下していきます。
その対処として、レプリケーションというRDBを更新用と参照用に分ける
方法です。
参照とはselect文が実行されるような処理を指し、insertやupdateといった処理は更新を指します。
更新用のDBをMaster、参照用のことをSlaveと言って、Master/Slave構成と言ったもします。
※最近ではMaster/Slaveといいかたは、「主人と奴隷」を指す差別用語だと言われることも多いため、メインとレプリカといったりする言い方が一般的になってきているようです。
一般的に更新系よりも参照系の方が実行される回数が多いため、参照系の処理ならSlaveの方にアクセスして、更新系ならMasterの方にアクセスするようにします。
参照系であるSlaveは更新系のMasterの情報と同期しており、Master側が更新されれば、瞬時に参照系のSlaveにも同じ内容が渡って同期されます。
出典:https://mariadb.com/resources/blog/database-master-slave-replication-in-the-cloud/
上の画像にもある通り、参照系であるSlaveは1つだけでなく、複数であったりします。
設定ファイルの雛形を作成
SQLAlchemyのインストール方法や設定ファイルの詳細についてはここでは解説しません。
以前自分が書いたこちらの記事を元に設定ファイルを作成していきます。
こちらはMaster/Slaveと分けていない場合のSQLAlchemyの設定ファイルです。
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 接続先DBの設定
DATABASE = 'mysql://db_user:db_password@192.168.0.1:3306/dbname'
# Engine の作成
Engine = create_engine(
DATABASE,
encoding="utf-8",
echo=False
)
# Sessionの作成
session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
bind=Engine
)
)
# modelで使用する
Base = declarative_base()
Base.query = session.query_property()
Master/Slaveに対応した設定ファイルの作成
それでは上記のdatabase.py
を編集して、Master/Slave
に対応した設定ファイルを作成します。
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base
# SlaveのDB情報
SLAVE_DB_LIST = os.environ.get('SLAVE_DB_LIST', '')
# 接続先DBの設定
DATABASE = 'mysql://db_user:db_password@192.168.0.1:3306/dbname'
SQLALCHEMY_ENGINES: Dict[str, str] = {
'master': DATABASE,
# 'slave1': 'mysql://db_user:db_password@192.168.0.2:3306/dbname',
}
if SLAVE_DB_LIST:
slave_db_list: list[str] = SLAVE_DB_LIST.split(',')
for i, db in enumerate(slave_db_list):
db_key: str = 'slave' + str(i + 1)
SQLALCHEMY_ENGINES[db_key] = db + f'/{DB_NAME}?{OPTION}'
slave_keys: list[str] = []
engines: Dict[str, Engine] = {}
for key in SQLALCHEMY_ENGINES.keys():
engines[key] = create_engine( # type: ignore
SQLALCHEMY_ENGINES[key],
encoding='utf-8',
logging_name=key,
echo=True,
)
if re.match(r'^slave', key):
slave_keys.append(key)
class DatabaseSession(Session):
_name = None
def get_bind(self) -> Engine: # type: ignore
engine = None
if self._name:
engine = engines[self._name]
elif self._flushing:
engine = engines['master']
else:
engine = engines[random.choice(slave_keys)]
return engine
def using_bind(self, name: str):
session: DatabaseSession = DatabaseSession()
vars(session).update(vars(self))
session._name = name
return session
# Sessionの作成
session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
class_=DatabaseSession,
)
)
# modelで使用する
Base = declarative_base()
Base.query = session.query_property()
MasterとSlaveのエンジンを作成する
上記のdatabase.py
で追加した一部です。SLAVE_DB_LIST
という変数を環境変数で取得します。
こちらはカンマ区切りで以下のようなDB情報を文字列で入れています。
mysql://db_user:db_password@192.168.0.2:3306/dbname
mysql://db_user:db_password@192.168.0.3:3306/dbname
後はSQLALCHEMY_ENGINES
を元にfor文で回して、エンジンを作成して辞書型のenginesに格納します。このenginesがmasterとslaveの両方が格納してあります。
# SlaveのDB情報
SLAVE_DB_LIST = os.environ.get('SLAVE_DB_LIST', '')
# 接続先DBの設定
DATABASE = 'mysql://db_user:db_password@192.168.0.1:3306/dbname'
SQLALCHEMY_ENGINES: Dict[str, str] = {
'master': DATABASE,
# 'slave1': 'mysql://db_user:db_password@192.168.0.2:3306/dbname',
}
if SLAVE_DB_LIST:
slave_db_list: list[str] = SLAVE_DB_LIST.split(',')
for i, db in enumerate(slave_db_list):
db_key: str = 'slave' + str(i + 1)
SQLALCHEMY_ENGINES[db_key] = db + f'/{DB_NAME}?{OPTION}'
slave_keys: list[str] = []
engines: Dict[str, Engine] = {}
for key in SQLALCHEMY_ENGINES.keys():
engines[key] = create_engine( # type: ignore
SQLALCHEMY_ENGINES[key],
encoding='utf-8',
logging_name=key,
echo=True,
)
if re.match(r'^slave', key):
slave_keys.append(key)
カスタマイズのセッションの作成
ここが重要なポイントとなります。カスタマイズのセッションを作成します。
このセッションクラスの中でMasterとSlaveの切り替えを行います。
get_bind
がオーバーライドされるメソッドであり、using_bind
がカスタマイズのメソッドになります。
get_bind
の処理の中身を見ればわかりますが、self._flushing
がTrueになるクエリ結果をデータベースに反映している場合は、MasterのDBのエンジンを使用して、それ以外はSlaveを使用します。
呼び出し元からMaster/Slaveどちらかを選びたい場合のためのusing_bind
も実装します。
class DatabaseSession(Session):
_name = None
def get_bind(self) -> Engine: # type: ignore
engine = None
if self._name:
engine = engines[self._name]
elif self._flushing:
engine = engines['master']
else:
engine = engines[random.choice(slave_keys)]
return engine
def using_bind(self, name: str):
session: DatabaseSession = DatabaseSession()
vars(session).update(vars(self))
session._name = name
return session
sessionmaker
のclass_
にカスタマイズセッションのDatabaseSessionを引数として渡します。
# Sessionの作成
session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
class_=DatabaseSession,
)
)
カスタムセッションを使用してみる
今回は例としてこちらのモデルを使った場合のコードを書いていきます。
from database import session, Base
from sqlalchemy import Column, Integer, String
class SampleModel(Base):
__tablename__ = 'sample'
id = Column('id', Integer, primary_key = True)
title = Column(String(256))
実際にセッションを呼び出してみたコードの例となります。一番最後のusing_bind
メソッドを使用したコードでは、引数にmaster
を使用しているため、参照系ではあるもののmasterにアクセスします。
from database import session
# 更新系のDBであるMasterが使われる
session.add_all([
SampleModel(title='entry_a'),
SampleModel(title='entry_b'),
SampleModel(title='entry_c'),
])
session.commit();
# 参照系のDBであるSlaveが使われる
result = session.query(SampleModel).all()
print('result', result)
# 参照系のDBであるMasterが使われる
result = session().using_bind('master').query(SampleModel).first()
print('result', result)
参考資料
Discussion