🐱

postgres限定SQLの小ネタ

2022/09/01に公開

徒然なるままに記載

システム運用現場で実際につかったことのあるクエリーたちです。
なにかの参考になるかもしれませんし。
そもそも下記のような小ネタを使わなくて済む構造設計をするための一助になるかなとも思います。
まぁ小ネタなのでアンチパターンと呼ばれるようなものが多いかとおもいます。

IN句にプリペアードステートメントをかましたい場合

sample.master1

id name
1 master1_1
2 master1_2
3 master1_3
4 master1_4
5 master1_5

上記のid 1,2を取得したい、しかもプリペアードステートメントで。
これは普通にやると実現が難しいですね。
できなくはないですが、今度はステートメント数が多くなったりで悩みを生む要因であったりもすると思います。

--regexp_split_to_table('{カンマ区切りのID群}', E',')で文字列をレコードに分解できる
select CAST(ids as numeric)  FROM regexp_split_to_table('1,2', E',') 

結果

ids
1
2

これを利用して

SELECT 
 *
FROM master1
WHERE 1=1
AND id IN (select CAST(ids as numeric)  FROM regexp_split_to_table($1, E',') AS ids) ;

上記$1に'1,2'を指定すればよい。
これはプログラム上でも応用が利ききそうですね。

SQL構文で無理やりORDERBYの順番操作

SELECT
    *
FROM
    master1
WHERE 1=1
--無理やり順番そろえ
ORDER BY
  CASE id WHEN  2  THEN 1 ELSE 2 END,
  CASE id WHEN  3  THEN 1 ELSE 2 END,
  CASE id WHEN  1  THEN 1 ELSE 2 END,
id

結果

id name
2 master1_2
3 master1_3
1 master1_1
4 master1_4
5 master1_5

セレクト結果をJSONで一行に

一行に複数のデータを表現したい場合ってたまにあると思います。
特にGROUP BYなど
また、大量のテーブルからそれぞれ、データを一括で取得するような必要がたまーに発生したりしますよね。
そのような場合の対策です

sample.master1

id name
1 master1_1
2 master1_2
3 master1_3

sample.master2

id name
1 master2_1
2 master2_2
3 master2_3
SELECT 
  ARRAY_TO_JSON(ARRAY_AGG(m1)) as master1_json
from (
  SELECT id,name FROM master1
) as m1

上記を実行すると

このようなデータを

master1_json
[{"id":1,"name":"master1_1"},{"id":2,"name":"master1_2"},{"id":3,"name":"master1_3"}]

このように取得できる。
さらに、ほかのテーブルも同時に一行でとれる

WITH
master1_json as (
SELECT 
  ARRAY_TO_JSON(ARRAY_AGG(m1)) as master1_json
from (
  SELECT id,name FROM master1
) as m1,
WITH
master2_json as (
SELECT 
  ARRAY_TO_JSON(ARRAY_AGG(m2)) as master2_json
from (
  SELECT id,name FROM master2
) as m2,
SELECT
(select * from master1_json),
(select * from master2_json)

結果

master1_json master2_json
[{"id":1,"name":"master1_1"},{"id":2,"name":"master1_2"},{"id":3,"name":"master1_3"}] [{"id":1,"name":"master2_1"},{"id":2,"name":"master2_2"},{"id":3,"name":"master2_3"}]

テーブル名検索とかに役立つクエリ

令和なので少なくなっているかもしれませんが、Fkeyもろく貼ってない(たまに貼ってある)、ドキュメントもない。
なのに大量のテーブルが存在し、どのテーブルとどのテーブルが関連づいているのかわからない場合。
SQLによるテーブル検索がある程度効果を発揮することもあります。

sample.master1

id sub1_id(fkey定義なし) name
1 1 master1_1
2 2 master1_2
3 2 master1_3

sample.sub_master1

sub1_id name
1 sub1_1
2 sub1_2
3 sub1_3

sample.master2

id name sub2_id(fkey::sub_master2.id)
1 master2_1 1
2 master2_2 2
3 master2_3 3

sample.sub_master2

id master2_id
1 1
2 2
3 3

カラム名からテーブル名特定

SELECT 
table_schema||'.'||table_name as schema_table_name
,table_name as table_name 
FROM  information_schema.columns 
WHERE 1+1
AND column_name={カラム名}
AND table_schema not in ('{対象外としたいスキーマをカンマ区切り}') 
ORDER BY ordinal_position

上記のカラム名に'id'を指定して検索すると

schema_table_name table_name
sample.master1 master1
sample.master2 master2
sample.sub_master2 sub_master2

テーブル名から依存関係にあるテーブル一覧抜出

SELECT 
 tc.table_name
 ,ccu.column_name as column_name
 ,occu.table_schema as origin_table_schema
 ,occu.table_name as origin_table
 ,occu.column_name as origin_column
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name
JOIN information_schema.referential_constraints rc ON tc.constraint_name = rc.constraint_name
JOIN information_schema.constraint_column_usage occu ON rc.unique_constraint_name = occu.constraint_name
WHERE 1+1 
AND tc.table_name = '{スキーマ名.テーブル名}'
AND tc.constraint_type = 'FOREIGN KEY'
AND occu.table_name not like '{対象外にしたいテーブル名称パターン}'

上記の
「スキーマ名.テーブル名」にsample.master1を指定すると

schema_table_name table_name
sample.sub_master1 sub_master1

上記結果をうまく組み合わせることで、ある程度網羅的に関連テーブルを検索することができそうですね。

これを利用して、カラム名とその値を指定することで、関連テーブルの同一と思わしきレコードを再帰的にCOPYする
ストアドを作成してみました。

カラム名と値から関連レコードを引っ張るストアド

fnc_copy_tables_for_column

CREATE OR REPLACE function public.fnc_copy_tables_for_column(criteria TEXT, param TEXT, limit_num INTEGER DEFAULT 100)
    RETURNS void
    LANGUAGE plpgsql
AS $body$
DECLARE
    nsname_rec RECORD;
    fktname_rec RECORD;
    fktrec RECORD;
    child boolean;
    fktrecsql text;
    copysql text;
    file_suffix text;
    copyfile text;
BEGIN
    file_suffix := criteria||'_'||param;
    EXECUTE secureSql;
    FOR nsname_rec IN (SELECT table_schema||'.'||table_name as schema_table_name,table_name as table_name FROM  information_schema.columns WHERE column_name=criteria and  table_schema not in ('{対象外としたいスキーマをカンマ区切り}') ORDER BY ordinal_position)
    LOOP
        FOR fktname_rec IN (
          SELECT tc.table_name,c.column_name as column_name,occu.table_schema as origin_table_schema,occu.table_name as origin_table,occu.column_name as origin_column
          FROM information_schema.table_constraints tc
          JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name
          JOIN information_schema.referential_constraints rc ON tc.constraint_name = rc.constraint_name
          JOIN information_schema.constraint_column_usage occu ON rc.unique_constraint_name = occu.constraint_name
          JOIN information_schema.key_column_usage c ON c.constraint_name = tc.constraint_name
          JOIN information_schema.tables ot ON occu.table_name=ot.table_name AND ot.table_type='BASE TABLE'
          WHERE tc.table_name = nsname_rec.table_name
          AND tc.constraint_type = 'FOREIGN KEY'
          AND occu.table_name not like '{対象外にしたいテーブル名称パターン}'
          AND EXISTS (SELECT 1 FROM  information_schema.columns WHERE table_schema=tc.table_schema AND table_name=tc.table_name AND column_name=ccu.column_name AND data_type IN ('bigint','integer','numeric'))
        ) LOOP
          fktrecsql := format('SELECT ARRAY_TO_STRING(ARRAY_AGG(cast(fktparam as text)),%L) as fktparam FROM (SELECT distinct '||fktname_rec.column_name||' as fktparam FROM '||nsname_rec.table_name||' WHERE '||criteria||' =%L) a',',',param);
          EXECUTE fktrecsql INTO fktrec;
          IF fktrec.fktparam IS NOT NULL THEN
            EXECUTE (SELECT public.fnc_copy_tables_for_table_name(fktname_rec.origin_table_schema,fktname_rec.origin_table,fktname_rec.origin_column,fktrec.fktparam,file_suffix,limit_num));
          ELSE
            RAISE INFO 'record_nothing::%',fktname_rec.origin_table;
          END IF;
        END LOOP;
        copyfile := '/tmp/copyfiles/copy_'||nsname_rec.schema_table_name||'_'||file_suffix||'.csv';
        copysql := format('COPY (SELECT * FROM '||nsname_rec.schema_table_name||' WHERE '||criteria||' = %L  LIMIT %L) TO %L WITH CSV;', param, limit_num, copyfile);
        RAISE INFO '%',copysql;
        EXECUTE copysql;
    END LOOP;
END;
$body$
    VOLATILE
    COST 100;

上記が参照するfnc_copy_tables_for_table_nameという再帰的なストアド
fnc_copy_tables_for_table_name

CREATE OR REPLACE function public.fnc_copy_tables_for_table_name(schemaname TEXT, tablename TEXT, criteria TEXT, param TEXT, file_suffix text, limit_num INTEGER DEFAULT 100)
    RETURNS void
    LANGUAGE plpgsql
AS $body$
DECLARE
    nsname_rec RECORD;
    fktname_rec RECORD;
    fktrec RECORD;
    fktrecsql text;
    copysql text;
    copyfile text;
BEGIN

    FOR fktname_rec IN (
      SELECT tc.table_name,c.column_name as column_name,occu.table_schema as origin_table_schema,occu.table_name as origin_table,occu.column_name as origin_column
      FROM information_schema.table_constraints tc
      JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name
      JOIN information_schema.referential_constraints rc ON tc.constraint_name = rc.constraint_name
      JOIN information_schema.constraint_column_usage occu ON rc.unique_constraint_name = occu.constraint_name
      JOIN information_schema.key_column_usage c ON c.constraint_name = tc.constraint_name
      JOIN information_schema.tables ot ON occu.table_name=ot.table_name AND ot.table_type='BASE TABLE'
      WHERE tc.table_name = tablename
      AND tc.constraint_type = 'FOREIGN KEY'
      AND occu.table_name not like '{対象外にしたいテーブル名称パターン}'
      AND EXISTS (SELECT 1 FROM  information_schema.columns WHERE table_schema=tc.table_schema AND table_name=tc.table_name AND column_name=ccu.column_name AND data_type IN ('bigint','integer','numeric'))
    ) LOOP
      fktrecsql := format('SELECT ARRAY_TO_STRING(ARRAY_AGG(cast(fktparam as text)),%L) as fktparam FROM (SELECT distinct '||fktname_rec.column_name||' as fktparam FROM '||nsname_rec.table_name||' WHERE '||criteria||' IN (select CAST(ids as numeric)  FROM regexp_split_to_table(%L, E%L) AS ids) ) a',',',param,',');
      EXECUTE fktrecsql INTO fktrec;
      IF fktrec.fktparam IS NOT NULL THEN
        EXECUTE (SELECT public.fnc_copy_tables_for_table_name(fktname_rec.origin_table_schema,fktname_rec.origin_table,fktname_rec.origin_column,fktrec.fktparam,file_suffix,limit_num));
      ELSE
        RAISE INFO 'record_nothing::%',fktname_rec.origin_table;
      END IF;
    END LOOP;
    copyfile := '/tmp/copyfiles/copy_'||schemaname||'.'||tablename||'_'||file_suffix||'.csv';
    copysql := format('COPY (SELECT * FROM '||schemaname||'.'||tablename||' WHERE '||criteria||' IN (select CAST(ids as numeric)  FROM regexp_split_to_table(%L, E%L) AS ids)  LIMIT %L) TO %L WITH CSV;', param,',', limit_num, copyfile);
    RAISE INFO '%',copysql;
    EXECUTE copysql;
END;
$body$
    VOLATILE
    COST 100;

上記をcreateしたうえで

SELECT public.fnc_copy_tables_for_column('id','1');

を実行すると。
sample.master1

id sub1_id(fkey定義なし) name
1 1 master1_1

sample.sub_master1

sub1_id name
1 sub1_1

sample.master2

id name sub2_id(fkey::sub_master2.id)
1 master2_1 1

sample.sub_master2

id master2_id
1 1

のCSVデータ出来上がると思います。

イミュータブルデータモデル設計をしたい際の変更履歴記録

イミュータブルモデルを採用するときに限った話ではないのですが。
データを変更した履歴を残したい。
といった状況は頻繁に訪れると思います。

その際、トリガーを使いINSERT、UPDATE時に自動で履歴テーブルに追加みたいなことも選択肢としてでてきますよね。
ただ、その対象が多くなると、非常にメンドクサイことに一個一個のテーブルに対して丁寧にトリガーを作成して関連付けてあげる必要が出てくると思います。

これを少しだけ楽にする関数を作りました

fnc_logged_audit()

CREATE OR REPLACE FUNCTION sample.fnc_logged_audit()
  RETURNS trigger
  LANGUAGE plpgsql
AS
$body$
DECLARE
    audit_table_name text;
    new_pivot record;
    t_column_text text;
    t_value_text text;
    audit_sql text;
    target_rec record;
    action_type char(1);
BEGIN
    --履歴トリガー関数
    BEGIN
        IF(TG_OP = 'INSERT') THEN action_type := 1;
        ELSEIF(TG_OP = 'UPDATE') THEN action_type := 2;
        ELSE action_type := 3;
        END IF;
        audit_table_name :=  TG_TABLE_NAME||'_audit';
        IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE'  ) THEN
            target_rec := NEW;
        ELSE
            target_rec := OLD;
        END IF;
        t_column_text  := '';
        t_value_text  := '';
        -- NEWまたはOLDをjson成形後、pivotしたものをLOOP、column,valueを利用してSQLを生成する
        FOR new_pivot in SELECT * FROM json_each_text((SELECT to_json(target_rec)))as a
            LOOP
                --RAISE WARNING 'test:%',new_pivot.key;
                --RAISE WARNING 'test:%',new_pivot.value;
                t_column_text := t_column_text||','||new_pivot.key;
                IF new_pivot.value IS NULL THEN
                    t_value_text := t_value_text||',null';
                ELSE
                    t_value_text := t_value_text||','||''''||new_pivot.value||'''';
                END IF;
            END LOOP;
        t_column_text := t_column_text||',action_type';
        t_value_text := t_value_text||','||''''||action_type||'''';
        SELECT  INTO t_column_text (SELECT substr(t_column_text,2));
        SELECT  INTO t_value_text (SELECT substr(t_value_text,2));
        audit_sql := 'INSERT INTO  audit.'||audit_table_name|| ' ('||t_column_text||') VALUES('||t_value_text||')';
        --RAISE WARNING 'test:%', audit_sql;
        EXECUTE audit_sql;
    EXCEPTION
        WHEN division_by_zero THEN
            RAISE WARNING 'caught division_by_zero';
            RETURN NULL;
        WHEN OTHERS THEN
            RAISE WARNING  'fnc_logged_audit ERROR:%',sqlstate;
            RETURN NULL;
    END;
    RETURN NULL;
END;
$body$
  VOLATILE
  COST 100;

上記ストアドをトリガーで紐づけます。

CREATE TRIGGER trg_master1_z AFTER INSERT OR UPDATE OR DELETE
    ON sample.master1 FOR EACH ROW
EXECUTE PROCEDURE sample.fnc_logged_audit();

CREATE TRIGGER trg_master2_z AFTER INSERT OR UPDATE OR DELETE
    ON sample.master2 FOR EACH ROW
EXECUTE PROCEDURE sample.fnc_logged_audit();

CREATE TRIGGER trg_sub_master1_z AFTER INSERT OR UPDATE OR DELETE
    ON sample.sub_master1 FOR EACH ROW
EXECUTE PROCEDURE sample.fnc_logged_audit();

このように関連付けたうえで、
sample.master1または sample.master2または sample.sub_master1
に対してINSERT|UPDATE|DELETEを実行すると
auditスキーマの「同名+_audit」テーブルの上に履歴レコードが追加されます。

おわりに

一旦ここまでとして何か思い出したらまた第二弾を記事作ろうと思います。
なんかarray系のネタがおおいなぁって思いました。
こういう融通の利き方をみたりすると、postgresは開発者に寄り添ってるなぁって思ってます。

なにはともあれ、とにかく正規化されてなさすぎるDBと戦ってる方々への助けと慣れれば幸いです。

間違いや、ご指摘事項お待ちしております。

insight

Discussion