🌳

PythonでGoogle風の検索クエリをSQLにトランスパイル

2025/02/07に公開

streamlitなどでWEBアプリを作る際に検索フォームを導入したい。

検索フォームでは慣れ親しんだGoogle風の検索演算子が使えると便利だと思った。

例えば、以下のように検索できると良い。

hoge
hoge OR foo bar
hoge OR (foo bar)
abc -def
abc -(def OR GHI)

検索クエリからSQLで使用する条件式を生成するには、まず検索クエリをAST(ツリー)に変換して加工しやすくして、そのツリーをSQL条件式にトランスパイルするのが良さそうだった。

変換のイメージとしては、「検索クエリ(文字列) → AST → SQL条件式(文字列)」である。

調べてみるとPythonでは、文字列からASTを生成するのに、pyparsingを使うと良いらしいことがわかった。

検索クエリをLisp風のASTに変換

ASTをどのように作るか

一口にASTと言っても色々な構成方法があると思う。

ツリーのノードをどのように作るかを考える時間もなかったので、Lisp風にすることにした。

Lispであれば言語そのものが構文木のようなものだし、emacsやClojureを通して少しだけ触ったことがあったからだ。

Lispであれば構成要素も少ないし、初心者が枝葉につまずいたり足を滑らせたりすることもないように思えたのも理由の一つだ。

つまり最初の「検索クエリ → AST」では以下のような変換を行うparse_search_query関数を作成できれば良い。

>>> parse_search_query("hoge")
"hoge"
>>> parse_search_query("hoge OR foo bar")
[AND, [OR, "hoge", "foo"], "bar"]
>>> parse_search_query("hoge OR (foo bar)")
[OR, "hoge", [AND, "foo", "bar"]]
>>> parse_search_query("abc -def")
[AND, "abc", [NOT, "def"]]
>>> parse_search_query("abc -(def OR ghi)")
[AND, "abc", [NOT, [OR, "def", "ghi"]]]

Python上でのAST表現とLisp風のイメージは以下のような対応関係がある(ほとんどそのままだけど)。

Python上でのAST表現 Lisp風のイメージ
"hoge" "hoge"
[AND, [OR, "hoge", "foo"], "bar"] (and (or "hoge" "foo") "bar")
[OR, "hoge", [AND, "foo", "bar"]] (or "hoge" (and "foo" "bar"))
[AND, "abc", [NOT, "def"]] (and "abc" (not "def"))
[AND, "abc", [NOT, [OR, "def", "ghi"]]] (and "abc" (not (or "def" "ghi")))

実際の変換

pyparsingに全然触れたことがなかったので、何かサンプルはないかと探したらこのリポジトリでドンピシャなことをしていたので、こちらを参考に自分の必要な箇所のみを切り出していくことにした。

import re
import unicodedata
import logging
from enum import Enum, auto
from typing import Self

import pyparsing as pp

logger = logging.getLogger(__name__)

SearchQueryNode = str | int | float | list

class SearchQueryOperator(Enum):
    AND = auto()
    OR = auto()
    NOT = auto()

    @classmethod
    def from_string(cls, text: str) -> Self | None:
        match text:
            case "AND":
                return cls.AND
            case "OR":
                return cls.OR
            case "NOT" | "-":
                return cls.NOT
            case _:
                return None

def parse_search_query(query_text: str) -> SearchQueryNode:
    """
    Google風のAND/OR検索クエリをパースする

    >>> parse_search_query("HOGE")
    "HOGE"
    >>> parse_search_query("HOGE OR FOO BAR")
    [AND, [OR, "HOGE", "FOO"], "BAR"]
    >>> parse_search_query("HOGE OR (FOO BAR)")
    [OR, "HOGE", [AND, "FOO", "BAR"]]
    >>> parse_search_query("ABC -DEF")
    [AND, "ABC", [NOT, "DEF"]]
    >>> parse_search_query("ABC -(DEF OR GHI)")
    [AND, "ABC", [NOT, [OR, "DEF", "GHI"]]]
    """

    def parse_unary_op(tokens):
        operator = tokens[0][0]
        operand = tokens[0][1]

        match op := SearchQueryOperator.from_string(operator):
            case SearchQueryOperator():
                return [[op, operand]]
            case None:
                raise ValueError(f"Invalid search query operator: {operator}")

    def parse_binary_op(tokens):
        operators = tokens[0][1::2]
        operands = tokens[0][0::2]
        assert len(set(operators)) == 1

        operator = operators[0]

        match op := SearchQueryOperator.from_string(operator):
            case SearchQueryOperator():
                return [[op, *operands]]
            case None:
                raise ValueError(f"Invalid search query operator: {operator}")

    # 全角空白文字などを半角空白文字に正規化
    query_text = unicodedata.normalize("NFKC", query_text)

    AND = pp.Keyword("AND", caseless=False)
    OR = pp.Keyword("OR", caseless=False)
    NOT = pp.Literal("-")

    reserved_words = AND | OR
    word = ~reserved_words + pp.Regex(r'\b[A-Z0-9_-][^\s()*|:]*\b', flags=re.UNICODE)
    quoted_string = pp.QuotedString('"')

    token = word | quoted_string
    notation = pp.infixNotation(
        token,
        [
            (NOT, 1, pp.opAssoc.RIGHT, parse_unary_op),
            (OR, 2, pp.opAssoc.LEFT, parse_binary_op),
            # Optionalのdefaultによって`a b`とANDが省略された場合でも`a AND b`として処理してくれる
            (pp.Optional(AND, default="AND"), 2, pp.opAssoc.LEFT, parse_binary_op),
        ],
    )

    result = notation.parseString(query_text, True)
    result_list = result.as_list()
    return result_list[0]

ASTをSQLにトランスパイル

このASTを利用して実際に問い合わせるためのSQLを生成してみた。

def _to_sql_op(op: SearchQueryOperator) -> str:
    """検索クエリの演算子 ↔︎ SQLの演算子"""
    op_map = {
        SearchQueryOperator.AND: "and",
        SearchQueryOperator.OR: "or",
        SearchQueryOperator.NOT: "not",
    }
    return op_map[op]

def _like(word: str) -> str:
    """
    wordからSQLのwhere句用のlike条件式を生成する。
    取得されるデータのカラム名がハードコードされているため、特定のテーブルまたはSELECT用。

    カラム名が重複していたり、変更されていると不正なSQLになるかもしれない。
    """
    conds = [
        f"first_name like '%{word}%'",
        f"last_name like '%{word}%'",
        f"profile like '%{word}%'",
    ]
    expr = " or ".join(conds)
    return "(" + expr + ")"

def transpile(node) -> str:
    """
    検索クエリをパースした結果の構文木からSQLにトランスパイルを行う。
    _like関数を使うため、こちらもまた特定のテーブルまたはSELECT用。
    """

    if is_leaf(node):
        return _like(node)
    else:
        op, args = node[0], node[1:]
        sql_op = _to_sql_op(op)

        match len(args):
            case 1:  # NOTなどの単項演算子
                cond = transpile(args[0])
                expr = sql_op + " " + cond
                return "(" + expr + ")"
            case x if x >= 2:  # AND, ORなど多項演算子
                conds = [transpile(a) for a in args]
                expr = f" {sql_op} ".join(conds)
                return "(" + expr + ")"
            case _:
                raise RuntimeError(f"The operator is not supported: {op=}, {args=}")

このコードではSQLを文字列として出力したが、以下のようにpolarsのExprとしても出力できる。

from operator import and_, or_, invert
from typing import Any, Callable

from polars import col, Expr


def _to_sql_op(op: SearchQueryOperator) -> Callable[..., bool]:
    """検索クエリの演算子 ↔︎ polarsの演算子"""
    op_map = {
        SearchQueryOperator.AND: and_,
        SearchQueryOperator.OR: or_,
        SearchQueryOperator.NOT: invert,
    }
    return op_map[op]

def _like(word: str) -> Expr:
    return (
        col("first_name").like(f"%{word}%")
        | col("last_name").like(f"%{word}%")
        | col("profile").like(f"%{word}%")
    )

def transpile(node) -> Expr:
    if is_leaf(node):
        return _like(node)
    else:
        op, args = node[0], node[1:]

        match len(args):
            case 1:
                sql_op = _to_sql_op(op)
                return sql_op(args[0])
            case x if x >= 2:
                sql_op = _to_sql_op(op)
                conds = [transpile(a) for a in args]
                return reduce(op, conds)
            case _:
                raise RuntimeError("The operator is not supported: {op=}, {args=}")

Discussion