😀

【Python】SQLAlchemyでMySQLに接続する

2022/06/13に公開

今回は、Pythonで簡単にSQLを実行できるSQLAlchemyというO/Rマッパーを使用し、実際にMySQLとマッピングしてCSVファイルのデータを追加したり、CRUD操作をする方法について実装を行ったので、備忘録として残しておこうと思います。

前提

  • SQLAlchemyがインストールされていること
  • mysql-connector-pythonがインストールされていること

上記モジュールがインストールされていない場合は、pipにてインストールを実行してください。

% pip install SQLAlchemy
% pip install mysql-connector-python

DBと接続しCSVファイルのデータを追加する

まずは、DBとの接続情報を環境変数として定義します。

環境変数の定義

私の場合、.envに必要な情報を環境変数として定義し、config.pyで読み込みと定義を行なっています。

ここにPORTが必要となる方は、PORTも定義してください。

config.py
from dotenv import load_dotenv
load_dotenv()

import os

DB_USER = os.getenv('DB_USER')
PASSWORD = os.getenv('PASSWORD')
HOST = os.getenv('HOST')
DATABASE = os.getenv('DATABASE')

DBと接続し、データを追加する

環境変数の定義が完了したら、DBと接続しCSVファイルのデータを追加します。
以下がその全容を表したコードです。

sql_sample.py
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Float

import pandas as pd

import config

user = config.DB_USER
password = config.PASSWORD
host = config.HOST
db_name = config.DATABASE

# engineの設定
engine = create_engine(f'mysql+mysqlconnector://{user}:{password}@{host}/{db_name}')

# セッションの作成
db_session = scoped_session(
  sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine
  )
)

# テーブルを作成する
Base = declarative_base()
Base.query  = db_session.query_property()

# テーブルを定義する
# Baseを継承
class Wine(Base):
  """ワインの情報をもつCSVファイルのクラス

  Args:
      Base (_type_): DeclarativeBase
  """
  # テーブル名
  __tablename__ = 'wines'
  # カラムの定義
  id = Column(Integer, primary_key=True, autoincrement=True)
  wine_class = Column(Integer, unique=False)
  alcohol = Column(Float, unique=False)
  ash = Column(Float, unique=False)
  hue = Column(Float, unique=False)
  proline = Column(Integer, unique=False)
  
  def __init__(self, wine_class=None, alcohol=None, ash=None, hue=None, proline=None):
    self.wine_class = wine_class
    self.alcohol = alcohol
    self.ash = ash
    self.hue = hue
    self.proline = proline

Base.metadata.create_all(bind=engine)

def read_data():
  """CSVファイルを読み込み、DBにデータを追加する関数
  """
  wine_df = pd.read_csv('./data/wine_class.csv')

  for index, _df in wine_df.iterrows():
    row = Wine(wine_class=_df['Class'], alcohol=_df['Alcohol'], ash=_df['Ash'], hue=_df['Hue'], proline=_df['Proline'])
    # データを追加する
    db_session.add(row)

  db_session.commit()

read_data()

接続設定

sql_sample.py
user = config.DB_USER
password = config.PASSWORD
host = config.HOST
db_name = config.DATABASE

# engineの設定
engine = create_engine(f'mysql+mysqlconnector://{user}:{password}@{host}/{db_name}')

# セッションの作成
db_session = scoped_session(
  sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine
  )
)

# テーブルを作成する
Base = declarative_base()
Base.query  = db_session.query_property()

正直、接続して初期データを追加するまでは定型的な記述となります。
まず重要な点としては、create_engine()にて、「どのDBにどのように接続するか」といった設定を保存する部分です。

ここに先ほど、環境変数で定義した値を渡すことで接続設定が行われます。

次にセッション(sessionmaker)の作成ですが、SQLAlchemyでは、このセッションを用いてDBとのやり取りを行うため必須の記述となります。

その次の、Baseはテーブルを作成するための基盤のようなもので、こちらも必須の記述です。

ここで、ようやく設定が完了しました。

テーブル定義

sql_sample.py
# テーブルを定義する
# Baseを継承
class Wine(Base):
  """ワインの情報をもつCSVファイルのクラス

  Args:
      Base (_type_): DeclarativeBase
  """
  # テーブル名
  __tablename__ = 'wines'
  # カラムの定義
  id = Column(Integer, primary_key=True, autoincrement=True)
  wine_class = Column(Integer, unique=False)
  alcohol = Column(Float, unique=False)
  ash = Column(Float, unique=False)
  hue = Column(Float, unique=False)
  proline = Column(Integer, unique=False)
  
  def __init__(self, wine_class=None, alcohol=None, ash=None, hue=None, proline=None):
    self.wine_class = wine_class
    self.alcohol = alcohol
    self.ash = ash
    self.hue = hue
    self.proline = proline

Base.metadata.create_all(bind=engine)

設定が完了したら、テーブルのデータをオブジェクト(クラス)として扱うために、テーブルの内容を定義します。
必要となる情報は、テーブル名とテーブルのカラム名(CSVの列名)です。
そしてインスタンスが作成されたときに、データを追加(INSERT)するため、コンストラクタを定義します。

最後に実際にDBに存在するテーブルとマッピングを行うため、Base.metadata.create_all()でバインドを行います。

※ 実際に使用するDBにテーブルが存在しないとdoesn't existとマッピング先のテーブルがないというエラーが起こります。

CSVファイルのデータを追加する

sql_sample.py
def read_data():
  """CSVファイルを読み込み、DBにデータを追加する関数
  """
  wine_df = pd.read_csv('./data/wine_class.csv')

  for index, _df in wine_df.iterrows():
    row = Wine(wine_class=_df['Class'], alcohol=_df['Alcohol'], ash=_df['Ash'], hue=_df['Hue'], proline=_df['Proline'])
    # データを追加する
    db_session.add(row)

  db_session.commit()

read_data()

いよいよデータを追加していきます。
あらかじめ用意しているCSVファイルを pandasで読み込み、1行ずつ追加(add)していきます。
この部分がCRUDでいうところのCREATEになります。

最後にcommit()をしてDBに処理を反映させます。
これはSQLを使用している人は理解しやすいと思いますが、コミットしないとテーブルに対して行なった処理が反映されないので、コミットが必要となります。

CRUD操作

つづいて、上記でマッピングしたテーブルやデータに対してCRUD操作の実装を行います。

READ

まずは、全件取得してみましょう。
SQLでいうところのSELECT * FROM テーブル名です。

sql_sample.py
# DBにあるWineのデータを全件取得
db = db_session.query(Wine).all()
for row in db:
  # カラムを指定してデータを取得する
  print(row.alcohol)

SQLAlchemyでは基本的にsession.query(クラス名).行いたい処理という形式でクエリを発行します。
そして、取得するデータはインスタンスで取得されるので、カラムを指定するなどしないと、下記の出力がリストでなされることになります。

<__main__.Wine object at 0xxxxxxxxxxx>

基本的なREADの解説は完了したので、以下に続く処理については最低限の解説をするにとどめます。

カラムを指定して取得

sql_sample.py
db = db_session.query(Wine.hue, Wine.proline).all()

条件抽出(WHERE句)

sql_sample.py
db = db_session.query(Wine).filter(Wine.hue > 1.0).all()

WHEREにあたる部分がfilter()となり、引数に条件を指定することで条件抽出が可能となります。

取得レコード数に件数制限をかける(LIMIT句)

sql_sample.py
db = db_session.query(Wine).limit(20).all()

こちらは、そのまんまですがlimit()に取得件数を渡して実装します。

レコードの並び替え(ORDER BY句)

sql_sample.py
from sqlalchemy import desc
db = db_session.query(Wine).order_by(desc(Wine.proline)).all()

ORDER BYを使用する場合も、そのまんまですが、SQLAlchemydesc関数をインポートする必要があります。

CREATE

続いてCRUDのCですが、こちらも簡単に行えます。

sql_sample.py
# CREATE
wine = Wine(wine_class=1, alcohol=1, ash=1, hue=1, proline=1)
# Insertされる
db_session.add(wine)
db_session.commit()

最初にSQLAlchemyO/Rマッパーであると説明しましたが、テーブルとマッピングしているため、マッピングしているオブジェクトのインスタンスを生成し、セッションでadd()するだけでCREATEできます。

最後にコミットを忘れなければデータの作成ができます。

UPDATE

SQLでもそうですが、UPDATEは条件を指定する必要があります。
なぜなら、条件を指定しないと、全データがUPDATEされてしまうからです。
その点を考慮すると、先ほどのREADの部分での実装と組み合わせればできそうですね。

sql_sample.py
# UPDATE
# 条件に合致するものを1件(first())取得
db = db_session.query(Wine).filter(Wine.proline == 1).first()
db.wine_class = 10
db_session.commit()

まずは、UPDATEしたいデータを条件抽出し、「どの値を更新するか」を指定し、更新する値を代入します。
この部分はSQLのUPDATE文内のSETにあたります。

そして、値の代入が完了したら、こちらもコミットします。

DELETE

こちらも注意点としてはUPDATEと同じで、条件を指定する必要があります。

sql_sample.py
# DELETE
db_session.query(Wine).filter(Wine.proline == 1).delete()
db_session.commit()

DELETEの場合は、条件をfilter()で抽出し、delete()するだけで実装できます。
こちらも例のごとくコミットします。

まとめ

PythonのSQLAlchemyによるSQLの実装(DB操作)について解説しました。
ここまで簡単にSQLが扱えるのは、SQL経験者としては非常にラクだなと感じました。また、私の場合はJavaで仕事しているので、Pythonのとっつきやすさを感じました。ぜひご自身の環境で実装してみてください。

参考文献

Discussion