PythonでGoogle風の検索クエリをSQLにトランスパイル
streamlitなどでWEBアプリを作る際に検索フォームを導入したい。
検索フォームでは慣れ親しんだGoogle風の検索演算子が使えると便利だと思った。
例えば、以下のように検索できると良い。
hoge
hoge OR foo bar
hoge OR (foo bar)
abc -def
abc -(def OR GHI)
検索クエリからSQLで使用する条件式を生成するには、まず検索クエリをAST(ツリー)に変換して加工しやすくして、そのツリーをSQL条件式にトランスパイルするのが良さそうだった。
変換のイメージとしては、「検索クエリ(文字列) → AST → SQL条件式(文字列)」である。
調べてみるとPythonでは、文字列からASTを生成するのに、pyparsingを使うと良いらしいことがわかった。
検索クエリをLisp風のASTに変換
ASTをどのように作るか
一口にASTと言っても色々な構成方法があると思う。
ツリーのノードをどのように作るかを考える時間もなかったので、Lisp風にすることにした。
Lispであれば言語そのものが構文木のようなものだし、emacsやClojureを通して少しだけ触ったことがあったからだ。
Lispであれば構成要素も少ないし、初心者が枝葉につまずいたり足を滑らせたりすることもないように思えたのも理由の一つだ。
つまり最初の「検索クエリ → AST」では以下のような変換を行うparse_search_query
関数を作成できれば良い。
>>> parse_search_query("hoge")
"hoge"
>>> parse_search_query("hoge OR foo bar")
[AND, [OR, "hoge", "foo"], "bar"]
>>> parse_search_query("hoge OR (foo bar)")
[OR, "hoge", [AND, "foo", "bar"]]
>>> parse_search_query("abc -def")
[AND, "abc", [NOT, "def"]]
>>> parse_search_query("abc -(def OR ghi)")
[AND, "abc", [NOT, [OR, "def", "ghi"]]]
Python上でのAST表現とLisp風のイメージは以下のような対応関係がある(ほとんどそのままだけど)。
Python上でのAST表現 | Lisp風のイメージ |
---|---|
"hoge" |
"hoge" |
[AND, [OR, "hoge", "foo"], "bar"] |
(and (or "hoge" "foo") "bar") |
[OR, "hoge", [AND, "foo", "bar"]] |
(or "hoge" (and "foo" "bar")) |
[AND, "abc", [NOT, "def"]] |
(and "abc" (not "def")) |
[AND, "abc", [NOT, [OR, "def", "ghi"]]] |
(and "abc" (not (or "def" "ghi"))) |
実際の変換
pyparsingに全然触れたことがなかったので、何かサンプルはないかと探したらこのリポジトリでドンピシャなことをしていたので、こちらを参考に自分の必要な箇所のみを切り出していくことにした。
import re
import unicodedata
import logging
from enum import Enum, auto
from typing import Self
import pyparsing as pp
logger = logging.getLogger(__name__)
SearchQueryNode = str | int | float | list
class SearchQueryOperator(Enum):
AND = auto()
OR = auto()
NOT = auto()
@classmethod
def from_string(cls, text: str) -> Self | None:
match text:
case "AND":
return cls.AND
case "OR":
return cls.OR
case "NOT" | "-":
return cls.NOT
case _:
return None
def parse_search_query(query_text: str) -> SearchQueryNode:
"""
Google風のAND/OR検索クエリをパースする
>>> parse_search_query("HOGE")
"HOGE"
>>> parse_search_query("HOGE OR FOO BAR")
[AND, [OR, "HOGE", "FOO"], "BAR"]
>>> parse_search_query("HOGE OR (FOO BAR)")
[OR, "HOGE", [AND, "FOO", "BAR"]]
>>> parse_search_query("ABC -DEF")
[AND, "ABC", [NOT, "DEF"]]
>>> parse_search_query("ABC -(DEF OR GHI)")
[AND, "ABC", [NOT, [OR, "DEF", "GHI"]]]
"""
def parse_unary_op(tokens):
operator = tokens[0][0]
operand = tokens[0][1]
match op := SearchQueryOperator.from_string(operator):
case SearchQueryOperator():
return [[op, operand]]
case None:
raise ValueError(f"Invalid search query operator: {operator}")
def parse_binary_op(tokens):
operators = tokens[0][1::2]
operands = tokens[0][0::2]
assert len(set(operators)) == 1
operator = operators[0]
match op := SearchQueryOperator.from_string(operator):
case SearchQueryOperator():
return [[op, *operands]]
case None:
raise ValueError(f"Invalid search query operator: {operator}")
# 全角空白文字などを半角空白文字に正規化
query_text = unicodedata.normalize("NFKC", query_text)
AND = pp.Keyword("AND", caseless=False)
OR = pp.Keyword("OR", caseless=False)
NOT = pp.Literal("-")
reserved_words = AND | OR
word = ~reserved_words + pp.Regex(r'\b[A-Z0-9_-][^\s()*|:]*\b', flags=re.UNICODE)
quoted_string = pp.QuotedString('"')
token = word | quoted_string
notation = pp.infixNotation(
token,
[
(NOT, 1, pp.opAssoc.RIGHT, parse_unary_op),
(OR, 2, pp.opAssoc.LEFT, parse_binary_op),
# Optionalのdefaultによって`a b`とANDが省略された場合でも`a AND b`として処理してくれる
(pp.Optional(AND, default="AND"), 2, pp.opAssoc.LEFT, parse_binary_op),
],
)
result = notation.parseString(query_text, True)
result_list = result.as_list()
return result_list[0]
ASTをSQLにトランスパイル
このASTを利用して実際に問い合わせるためのSQLを生成してみた。
def _to_sql_op(op: SearchQueryOperator) -> str:
"""検索クエリの演算子 ↔︎ SQLの演算子"""
op_map = {
SearchQueryOperator.AND: "and",
SearchQueryOperator.OR: "or",
SearchQueryOperator.NOT: "not",
}
return op_map[op]
def _like(word: str) -> str:
"""
wordからSQLのwhere句用のlike条件式を生成する。
取得されるデータのカラム名がハードコードされているため、特定のテーブルまたはSELECT用。
カラム名が重複していたり、変更されていると不正なSQLになるかもしれない。
"""
conds = [
f"first_name like '%{word}%'",
f"last_name like '%{word}%'",
f"profile like '%{word}%'",
]
expr = " or ".join(conds)
return "(" + expr + ")"
def transpile(node) -> str:
"""
検索クエリをパースした結果の構文木からSQLにトランスパイルを行う。
_like関数を使うため、こちらもまた特定のテーブルまたはSELECT用。
"""
if is_leaf(node):
return _like(node)
else:
op, args = node[0], node[1:]
sql_op = _to_sql_op(op)
match len(args):
case 1: # NOTなどの単項演算子
cond = transpile(args[0])
expr = sql_op + " " + cond
return "(" + expr + ")"
case x if x >= 2: # AND, ORなど多項演算子
conds = [transpile(a) for a in args]
expr = f" {sql_op} ".join(conds)
return "(" + expr + ")"
case _:
raise RuntimeError(f"The operator is not supported: {op=}, {args=}")
このコードではSQLを文字列として出力したが、以下のようにpolarsのExprとしても出力できる。
from operator import and_, or_, invert
from typing import Any, Callable
from polars import col, Expr
def _to_sql_op(op: SearchQueryOperator) -> Callable[..., bool]:
"""検索クエリの演算子 ↔︎ polarsの演算子"""
op_map = {
SearchQueryOperator.AND: and_,
SearchQueryOperator.OR: or_,
SearchQueryOperator.NOT: invert,
}
return op_map[op]
def _like(word: str) -> Expr:
return (
col("first_name").like(f"%{word}%")
| col("last_name").like(f"%{word}%")
| col("profile").like(f"%{word}%")
)
def transpile(node) -> Expr:
if is_leaf(node):
return _like(node)
else:
op, args = node[0], node[1:]
match len(args):
case 1:
sql_op = _to_sql_op(op)
return sql_op(args[0])
case x if x >= 2:
sql_op = _to_sql_op(op)
conds = [transpile(a) for a in args]
return reduce(op, conds)
case _:
raise RuntimeError("The operator is not supported: {op=}, {args=}")
Discussion