🎴

SQLAlchemyでmaster/slaveの切り替え

2022/11/13に公開

RDBを使用していて、アクセス数が増えて負荷がかかった際の対処法としては、更新用と参照用でRDBを分けるレプリケーションというものがあります。

PythonのORMであるSQLAlchemyを使用していて、ORMからRDBにアクセスする際には、参照用と更新用に分けたいという状況もあると思います。

RDBのレプリケーションを実施した際に、SQLAlchemyに更新用と参照用を切り替えるための処理の追加方法を紹介します。

レプリケーションとは

ここでレプリケーションを知らない方のために簡単に解説します。

MySQL/PostgresqlといったRDBを運用していて、アクセス数などが増大していけば、システムの負荷が高くなり、パフォーマンスが低下していきます。

その対処として、レプリケーションというRDBを更新用と参照用に分ける方法です。

参照とはselect文が実行されるような処理を指し、insertやupdateといった処理は更新を指します。
更新用のDBをMaster、参照用のことをSlaveと言って、Master/Slave構成と言ったもします。

※最近ではMaster/Slaveといいかたは、「主人と奴隷」を指す差別用語だと言われることも多いため、メインとレプリカといったりする言い方が一般的になってきているようです。

一般的に更新系よりも参照系の方が実行される回数が多いため、参照系の処理ならSlaveの方にアクセスして、更新系ならMasterの方にアクセスするようにします。

参照系であるSlaveは更新系のMasterの情報と同期しており、Master側が更新されれば、瞬時に参照系のSlaveにも同じ内容が渡って同期されます。

https://mariadb.com/sites/default/files/pictures/Images/dbreplication173.png

出典:https://mariadb.com/resources/blog/database-master-slave-replication-in-the-cloud/

上の画像にもある通り、参照系であるSlaveは1つだけでなく、複数であったりします。

設定ファイルの雛形を作成

SQLAlchemyのインストール方法や設定ファイルの詳細についてはここでは解説しません。
以前自分が書いたこちらの記事を元に設定ファイルを作成していきます。

https://zenn.dev/shimakaze_soft/articles/6e5e47851459f5#設定ファイルの作成

こちらはMaster/Slaveと分けていない場合のSQLAlchemyの設定ファイルです。

database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# 接続先DBの設定
DATABASE = 'mysql://db_user:db_password@192.168.0.1:3306/dbname'

# Engine の作成
Engine = create_engine(
  DATABASE,
  encoding="utf-8",
  echo=False
)

# Sessionの作成
session = scoped_session(
    sessionmaker(
        autocommit=False,
        autoflush=False,
        bind=Engine
    )
)

# modelで使用する
Base = declarative_base()
Base.query = session.query_property()

Master/Slaveに対応した設定ファイルの作成

それでは上記のdatabase.pyを編集して、Master/Slaveに対応した設定ファイルを作成します。

database.py
import os

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base

# SlaveのDB情報
SLAVE_DB_LIST = os.environ.get('SLAVE_DB_LIST', '')

# 接続先DBの設定
DATABASE = 'mysql://db_user:db_password@192.168.0.1:3306/dbname'

SQLALCHEMY_ENGINES: Dict[str, str] = {
    'master': DATABASE,
    # 'slave1': 'mysql://db_user:db_password@192.168.0.2:3306/dbname',
}

if SLAVE_DB_LIST:
    slave_db_list: list[str] = SLAVE_DB_LIST.split(',')
    for i, db in enumerate(slave_db_list):
        db_key: str = 'slave' + str(i + 1)
        SQLALCHEMY_ENGINES[db_key] = db + f'/{DB_NAME}?{OPTION}'

slave_keys: list[str] = []
engines: Dict[str, Engine] = {}
for key in SQLALCHEMY_ENGINES.keys():
    engines[key] = create_engine(  # type: ignore
        SQLALCHEMY_ENGINES[key],
        encoding='utf-8',
        logging_name=key,
        echo=True,
    )

    if re.match(r'^slave', key):
        slave_keys.append(key)

class DatabaseSession(Session):
    _name = None

    def get_bind(self) -> Engine:  # type: ignore
        engine = None
        if self._name:
            engine = engines[self._name]
        elif self._flushing:
            engine = engines['master']
        else:
            engine = engines[random.choice(slave_keys)]
        return engine

    def using_bind(self, name: str):
        session: DatabaseSession = DatabaseSession()
        vars(session).update(vars(self))
        session._name = name

        return session

# Sessionの作成
session = scoped_session(
    sessionmaker(
        autocommit=False,
        autoflush=False,
        class_=DatabaseSession,
    )
)

# modelで使用する
Base = declarative_base()
Base.query = session.query_property()

MasterとSlaveのエンジンを作成する

上記のdatabase.pyで追加した一部です。SLAVE_DB_LISTという変数を環境変数で取得します。
こちらはカンマ区切りで以下のようなDB情報を文字列で入れています。

  • mysql://db_user:db_password@192.168.0.2:3306/dbname
  • mysql://db_user:db_password@192.168.0.3:3306/dbname

後はSQLALCHEMY_ENGINESを元にfor文で回して、エンジンを作成して辞書型のenginesに格納します。このenginesがmasterとslaveの両方が格納してあります。

# SlaveのDB情報
SLAVE_DB_LIST = os.environ.get('SLAVE_DB_LIST', '')

# 接続先DBの設定
DATABASE = 'mysql://db_user:db_password@192.168.0.1:3306/dbname'

SQLALCHEMY_ENGINES: Dict[str, str] = {
    'master': DATABASE,
    # 'slave1': 'mysql://db_user:db_password@192.168.0.2:3306/dbname',
}

if SLAVE_DB_LIST:
    slave_db_list: list[str] = SLAVE_DB_LIST.split(',')
    for i, db in enumerate(slave_db_list):
        db_key: str = 'slave' + str(i + 1)
        SQLALCHEMY_ENGINES[db_key] = db + f'/{DB_NAME}?{OPTION}'

slave_keys: list[str] = []
engines: Dict[str, Engine] = {}
for key in SQLALCHEMY_ENGINES.keys():
    engines[key] = create_engine(  # type: ignore
        SQLALCHEMY_ENGINES[key],
        encoding='utf-8',
        logging_name=key,
        echo=True,
    )

    if re.match(r'^slave', key):
        slave_keys.append(key)

カスタマイズのセッションの作成

ここが重要なポイントとなります。カスタマイズのセッションを作成します。
このセッションクラスの中でMasterとSlaveの切り替えを行います。

get_bindがオーバーライドされるメソッドであり、using_bindがカスタマイズのメソッドになります。

get_bindの処理の中身を見ればわかりますが、self._flushingがTrueになるクエリ結果をデータベースに反映している場合は、MasterのDBのエンジンを使用して、それ以外はSlaveを使用します。

呼び出し元からMaster/Slaveどちらかを選びたい場合のためのusing_bindも実装します。

class DatabaseSession(Session):
    _name = None

    def get_bind(self) -> Engine:  # type: ignore
        engine = None
        if self._name:
            engine = engines[self._name]
        elif self._flushing:
            engine = engines['master']
        else:
            engine = engines[random.choice(slave_keys)]
        return engine

    def using_bind(self, name: str):
        session: DatabaseSession = DatabaseSession()
        vars(session).update(vars(self))
        session._name = name

        return session

sessionmakerclass_にカスタマイズセッションのDatabaseSessionを引数として渡します。

# Sessionの作成
session = scoped_session(
    sessionmaker(
        autocommit=False,
        autoflush=False,
        class_=DatabaseSession,
    )
)

カスタムセッションを使用してみる

今回は例としてこちらのモデルを使った場合のコードを書いていきます。

from database import session, Base
from sqlalchemy import Column, Integer, String

class SampleModel(Base):
    __tablename__ = 'sample'
    id = Column('id', Integer, primary_key = True)
    title = Column(String(256))

実際にセッションを呼び出してみたコードの例となります。一番最後のusing_bindメソッドを使用したコードでは、引数にmasterを使用しているため、参照系ではあるもののmasterにアクセスします。

app.py
from database import session

# 更新系のDBであるMasterが使われる
session.add_all([
    SampleModel(title='entry_a'),
    SampleModel(title='entry_b'),
    SampleModel(title='entry_c'),
])
session.commit();

# 参照系のDBであるSlaveが使われる
result = session.query(SampleModel).all()
print('result', result)

# 参照系のDBであるMasterが使われる
result = session().using_bind('master').query(SampleModel).first()
print('result', result)

参考資料

https://docs.sqlalchemy.org/en/20/orm/persistence_techniques.html#custom-vertical-partitioning

https://toritori0318.hatenadiary.jp/entry/20130513/1368451461

https://zenn.dev/rsk_for_dev/articles/3c30597bd040ce

https://sakaik.hateblo.jp/entry/20200702/mysql_changes_some_terminologies_eg_master_slave

Discussion