postgres限定SQLの小ネタ
徒然なるままに記載
システム運用現場で実際につかったことのあるクエリーたちです。
なにかの参考になるかもしれませんし。
そもそも下記のような小ネタを使わなくて済む構造設計をするための一助になるかなとも思います。
まぁ小ネタなのでアンチパターンと呼ばれるようなものが多いかとおもいます。
IN句にプリペアードステートメントをかましたい場合
sample.master1
id | name |
---|---|
1 | master1_1 |
2 | master1_2 |
3 | master1_3 |
4 | master1_4 |
5 | master1_5 |
上記のid 1,2を取得したい、しかもプリペアードステートメントで。
これは普通にやると実現が難しいですね。
できなくはないですが、今度はステートメント数が多くなったりで悩みを生む要因であったりもすると思います。
--regexp_split_to_table('{カンマ区切りのID群}', E',')で文字列をレコードに分解できる
select CAST(ids as numeric) FROM regexp_split_to_table('1,2', E',')
結果
ids |
---|
1 |
2 |
これを利用して
SELECT
*
FROM master1
WHERE 1=1
AND id IN (select CAST(ids as numeric) FROM regexp_split_to_table($1, E',') AS ids) ;
上記$1に'1,2'を指定すればよい。
これはプログラム上でも応用が利ききそうですね。
SQL構文で無理やりORDERBYの順番操作
SELECT
*
FROM
master1
WHERE 1=1
--無理やり順番そろえ
ORDER BY
CASE id WHEN 2 THEN 1 ELSE 2 END,
CASE id WHEN 3 THEN 1 ELSE 2 END,
CASE id WHEN 1 THEN 1 ELSE 2 END,
id
結果
id | name |
---|---|
2 | master1_2 |
3 | master1_3 |
1 | master1_1 |
4 | master1_4 |
5 | master1_5 |
セレクト結果をJSONで一行に
一行に複数のデータを表現したい場合ってたまにあると思います。
特にGROUP BYなど
また、大量のテーブルからそれぞれ、データを一括で取得するような必要がたまーに発生したりしますよね。
そのような場合の対策です
sample.master1
id | name |
---|---|
1 | master1_1 |
2 | master1_2 |
3 | master1_3 |
sample.master2
id | name |
---|---|
1 | master2_1 |
2 | master2_2 |
3 | master2_3 |
SELECT
ARRAY_TO_JSON(ARRAY_AGG(m1)) as master1_json
from (
SELECT id,name FROM master1
) as m1
上記を実行すると
このようなデータを
master1_json |
---|
[{"id":1,"name":"master1_1"},{"id":2,"name":"master1_2"},{"id":3,"name":"master1_3"}] |
このように取得できる。
さらに、ほかのテーブルも同時に一行でとれる
WITH
master1_json as (
SELECT
ARRAY_TO_JSON(ARRAY_AGG(m1)) as master1_json
from (
SELECT id,name FROM master1
) as m1,
WITH
master2_json as (
SELECT
ARRAY_TO_JSON(ARRAY_AGG(m2)) as master2_json
from (
SELECT id,name FROM master2
) as m2,
SELECT
(select * from master1_json),
(select * from master2_json)
結果
master1_json | master2_json |
---|---|
[{"id":1,"name":"master1_1"},{"id":2,"name":"master1_2"},{"id":3,"name":"master1_3"}] | [{"id":1,"name":"master2_1"},{"id":2,"name":"master2_2"},{"id":3,"name":"master2_3"}] |
テーブル名検索とかに役立つクエリ
令和なので少なくなっているかもしれませんが、Fkeyもろく貼ってない(たまに貼ってある)、ドキュメントもない。
なのに大量のテーブルが存在し、どのテーブルとどのテーブルが関連づいているのかわからない場合。
SQLによるテーブル検索がある程度効果を発揮することもあります。
sample.master1
id | sub1_id(fkey定義なし) | name |
---|---|---|
1 | 1 | master1_1 |
2 | 2 | master1_2 |
3 | 2 | master1_3 |
sample.sub_master1
sub1_id | name |
---|---|
1 | sub1_1 |
2 | sub1_2 |
3 | sub1_3 |
sample.master2
id | name | sub2_id(fkey::sub_master2.id) |
---|---|---|
1 | master2_1 | 1 |
2 | master2_2 | 2 |
3 | master2_3 | 3 |
sample.sub_master2
id | master2_id |
---|---|
1 | 1 |
2 | 2 |
3 | 3 |
カラム名からテーブル名特定
SELECT
table_schema||'.'||table_name as schema_table_name
,table_name as table_name
FROM information_schema.columns
WHERE 1+1
AND column_name={カラム名}
AND table_schema not in ('{対象外としたいスキーマをカンマ区切り}')
ORDER BY ordinal_position
上記のカラム名に'id'を指定して検索すると
schema_table_name | table_name |
---|---|
sample.master1 | master1 |
sample.master2 | master2 |
sample.sub_master2 | sub_master2 |
テーブル名から依存関係にあるテーブル一覧抜出
SELECT
tc.table_name
,ccu.column_name as column_name
,occu.table_schema as origin_table_schema
,occu.table_name as origin_table
,occu.column_name as origin_column
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name
JOIN information_schema.referential_constraints rc ON tc.constraint_name = rc.constraint_name
JOIN information_schema.constraint_column_usage occu ON rc.unique_constraint_name = occu.constraint_name
WHERE 1+1
AND tc.table_name = '{スキーマ名.テーブル名}'
AND tc.constraint_type = 'FOREIGN KEY'
AND occu.table_name not like '{対象外にしたいテーブル名称パターン}'
上記の
「スキーマ名.テーブル名」にsample.master1を指定すると
schema_table_name | table_name |
---|---|
sample.sub_master1 | sub_master1 |
上記結果をうまく組み合わせることで、ある程度網羅的に関連テーブルを検索することができそうですね。
これを利用して、カラム名とその値を指定することで、関連テーブルの同一と思わしきレコードを再帰的にCOPYする
ストアドを作成してみました。
カラム名と値から関連レコードを引っ張るストアド
fnc_copy_tables_for_column
CREATE OR REPLACE function public.fnc_copy_tables_for_column(criteria TEXT, param TEXT, limit_num INTEGER DEFAULT 100)
RETURNS void
LANGUAGE plpgsql
AS $body$
DECLARE
nsname_rec RECORD;
fktname_rec RECORD;
fktrec RECORD;
child boolean;
fktrecsql text;
copysql text;
file_suffix text;
copyfile text;
BEGIN
file_suffix := criteria||'_'||param;
EXECUTE secureSql;
FOR nsname_rec IN (SELECT table_schema||'.'||table_name as schema_table_name,table_name as table_name FROM information_schema.columns WHERE column_name=criteria and table_schema not in ('{対象外としたいスキーマをカンマ区切り}') ORDER BY ordinal_position)
LOOP
FOR fktname_rec IN (
SELECT tc.table_name,c.column_name as column_name,occu.table_schema as origin_table_schema,occu.table_name as origin_table,occu.column_name as origin_column
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name
JOIN information_schema.referential_constraints rc ON tc.constraint_name = rc.constraint_name
JOIN information_schema.constraint_column_usage occu ON rc.unique_constraint_name = occu.constraint_name
JOIN information_schema.key_column_usage c ON c.constraint_name = tc.constraint_name
JOIN information_schema.tables ot ON occu.table_name=ot.table_name AND ot.table_type='BASE TABLE'
WHERE tc.table_name = nsname_rec.table_name
AND tc.constraint_type = 'FOREIGN KEY'
AND occu.table_name not like '{対象外にしたいテーブル名称パターン}'
AND EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema=tc.table_schema AND table_name=tc.table_name AND column_name=ccu.column_name AND data_type IN ('bigint','integer','numeric'))
) LOOP
fktrecsql := format('SELECT ARRAY_TO_STRING(ARRAY_AGG(cast(fktparam as text)),%L) as fktparam FROM (SELECT distinct '||fktname_rec.column_name||' as fktparam FROM '||nsname_rec.table_name||' WHERE '||criteria||' =%L) a',',',param);
EXECUTE fktrecsql INTO fktrec;
IF fktrec.fktparam IS NOT NULL THEN
EXECUTE (SELECT public.fnc_copy_tables_for_table_name(fktname_rec.origin_table_schema,fktname_rec.origin_table,fktname_rec.origin_column,fktrec.fktparam,file_suffix,limit_num));
ELSE
RAISE INFO 'record_nothing::%',fktname_rec.origin_table;
END IF;
END LOOP;
copyfile := '/tmp/copyfiles/copy_'||nsname_rec.schema_table_name||'_'||file_suffix||'.csv';
copysql := format('COPY (SELECT * FROM '||nsname_rec.schema_table_name||' WHERE '||criteria||' = %L LIMIT %L) TO %L WITH CSV;', param, limit_num, copyfile);
RAISE INFO '%',copysql;
EXECUTE copysql;
END LOOP;
END;
$body$
VOLATILE
COST 100;
上記が参照するfnc_copy_tables_for_table_nameという再帰的なストアド
fnc_copy_tables_for_table_name
CREATE OR REPLACE function public.fnc_copy_tables_for_table_name(schemaname TEXT, tablename TEXT, criteria TEXT, param TEXT, file_suffix text, limit_num INTEGER DEFAULT 100)
RETURNS void
LANGUAGE plpgsql
AS $body$
DECLARE
nsname_rec RECORD;
fktname_rec RECORD;
fktrec RECORD;
fktrecsql text;
copysql text;
copyfile text;
BEGIN
FOR fktname_rec IN (
SELECT tc.table_name,c.column_name as column_name,occu.table_schema as origin_table_schema,occu.table_name as origin_table,occu.column_name as origin_column
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name
JOIN information_schema.referential_constraints rc ON tc.constraint_name = rc.constraint_name
JOIN information_schema.constraint_column_usage occu ON rc.unique_constraint_name = occu.constraint_name
JOIN information_schema.key_column_usage c ON c.constraint_name = tc.constraint_name
JOIN information_schema.tables ot ON occu.table_name=ot.table_name AND ot.table_type='BASE TABLE'
WHERE tc.table_name = tablename
AND tc.constraint_type = 'FOREIGN KEY'
AND occu.table_name not like '{対象外にしたいテーブル名称パターン}'
AND EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema=tc.table_schema AND table_name=tc.table_name AND column_name=ccu.column_name AND data_type IN ('bigint','integer','numeric'))
) LOOP
fktrecsql := format('SELECT ARRAY_TO_STRING(ARRAY_AGG(cast(fktparam as text)),%L) as fktparam FROM (SELECT distinct '||fktname_rec.column_name||' as fktparam FROM '||nsname_rec.table_name||' WHERE '||criteria||' IN (select CAST(ids as numeric) FROM regexp_split_to_table(%L, E%L) AS ids) ) a',',',param,',');
EXECUTE fktrecsql INTO fktrec;
IF fktrec.fktparam IS NOT NULL THEN
EXECUTE (SELECT public.fnc_copy_tables_for_table_name(fktname_rec.origin_table_schema,fktname_rec.origin_table,fktname_rec.origin_column,fktrec.fktparam,file_suffix,limit_num));
ELSE
RAISE INFO 'record_nothing::%',fktname_rec.origin_table;
END IF;
END LOOP;
copyfile := '/tmp/copyfiles/copy_'||schemaname||'.'||tablename||'_'||file_suffix||'.csv';
copysql := format('COPY (SELECT * FROM '||schemaname||'.'||tablename||' WHERE '||criteria||' IN (select CAST(ids as numeric) FROM regexp_split_to_table(%L, E%L) AS ids) LIMIT %L) TO %L WITH CSV;', param,',', limit_num, copyfile);
RAISE INFO '%',copysql;
EXECUTE copysql;
END;
$body$
VOLATILE
COST 100;
上記をcreateしたうえで
SELECT public.fnc_copy_tables_for_column('id','1');
を実行すると。
sample.master1
id | sub1_id(fkey定義なし) | name |
---|---|---|
1 | 1 | master1_1 |
sample.sub_master1
sub1_id | name |
---|---|
1 | sub1_1 |
sample.master2
id | name | sub2_id(fkey::sub_master2.id) |
---|---|---|
1 | master2_1 | 1 |
sample.sub_master2
id | master2_id |
---|---|
1 | 1 |
のCSVデータ出来上がると思います。
イミュータブルデータモデル設計をしたい際の変更履歴記録
イミュータブルモデルを採用するときに限った話ではないのですが。
データを変更した履歴を残したい。
といった状況は頻繁に訪れると思います。
その際、トリガーを使いINSERT、UPDATE時に自動で履歴テーブルに追加みたいなことも選択肢としてでてきますよね。
ただ、その対象が多くなると、非常にメンドクサイことに一個一個のテーブルに対して丁寧にトリガーを作成して関連付けてあげる必要が出てくると思います。
これを少しだけ楽にする関数を作りました
fnc_logged_audit()
CREATE OR REPLACE FUNCTION sample.fnc_logged_audit()
RETURNS trigger
LANGUAGE plpgsql
AS
$body$
DECLARE
audit_table_name text;
new_pivot record;
t_column_text text;
t_value_text text;
audit_sql text;
target_rec record;
action_type char(1);
BEGIN
--履歴トリガー関数
BEGIN
IF(TG_OP = 'INSERT') THEN action_type := 1;
ELSEIF(TG_OP = 'UPDATE') THEN action_type := 2;
ELSE action_type := 3;
END IF;
audit_table_name := TG_TABLE_NAME||'_audit';
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE' ) THEN
target_rec := NEW;
ELSE
target_rec := OLD;
END IF;
t_column_text := '';
t_value_text := '';
-- NEWまたはOLDをjson成形後、pivotしたものをLOOP、column,valueを利用してSQLを生成する
FOR new_pivot in SELECT * FROM json_each_text((SELECT to_json(target_rec)))as a
LOOP
--RAISE WARNING 'test:%',new_pivot.key;
--RAISE WARNING 'test:%',new_pivot.value;
t_column_text := t_column_text||','||new_pivot.key;
IF new_pivot.value IS NULL THEN
t_value_text := t_value_text||',null';
ELSE
t_value_text := t_value_text||','||''''||new_pivot.value||'''';
END IF;
END LOOP;
t_column_text := t_column_text||',action_type';
t_value_text := t_value_text||','||''''||action_type||'''';
SELECT INTO t_column_text (SELECT substr(t_column_text,2));
SELECT INTO t_value_text (SELECT substr(t_value_text,2));
audit_sql := 'INSERT INTO audit.'||audit_table_name|| ' ('||t_column_text||') VALUES('||t_value_text||')';
--RAISE WARNING 'test:%', audit_sql;
EXECUTE audit_sql;
EXCEPTION
WHEN division_by_zero THEN
RAISE WARNING 'caught division_by_zero';
RETURN NULL;
WHEN OTHERS THEN
RAISE WARNING 'fnc_logged_audit ERROR:%',sqlstate;
RETURN NULL;
END;
RETURN NULL;
END;
$body$
VOLATILE
COST 100;
上記ストアドをトリガーで紐づけます。
CREATE TRIGGER trg_master1_z AFTER INSERT OR UPDATE OR DELETE
ON sample.master1 FOR EACH ROW
EXECUTE PROCEDURE sample.fnc_logged_audit();
CREATE TRIGGER trg_master2_z AFTER INSERT OR UPDATE OR DELETE
ON sample.master2 FOR EACH ROW
EXECUTE PROCEDURE sample.fnc_logged_audit();
CREATE TRIGGER trg_sub_master1_z AFTER INSERT OR UPDATE OR DELETE
ON sample.sub_master1 FOR EACH ROW
EXECUTE PROCEDURE sample.fnc_logged_audit();
このように関連付けたうえで、
sample.master1または sample.master2または sample.sub_master1
に対してINSERT|UPDATE|DELETEを実行すると
auditスキーマの「同名+_audit」テーブルの上に履歴レコードが追加されます。
おわりに
一旦ここまでとして何か思い出したらまた第二弾を記事作ろうと思います。
なんかarray系のネタがおおいなぁって思いました。
こういう融通の利き方をみたりすると、postgresは開発者に寄り添ってるなぁって思ってます。
なにはともあれ、とにかく正規化されてなさすぎるDBと戦ってる方々への助けと慣れれば幸いです。
間違いや、ご指摘事項お待ちしております。
Discussion