🔬

DataformでSQLFluffを使いたい

2024/08/19に公開

はじめに

SQLFluffは、SQLコードの静的解析およびフォーマッティングが可能なOSSツールです。
https://sqlfluff.com/
今回は夏休みの自由研究として、これをdataformに適用することを考えます。

モチベーション

dataformではcompileやdry-runの機能がありますが、チーム開発でSQLをより安全に管理したい場合、それらの機能だけでは若干役不足感が否めず、SQLコードであれ一定の規約の元に秩序を作り安全にコードの反映を繰り返せる体制を作りたいというモチベがあります。というか、ほぼこのスレッドの質問主の方と同じモチベです。そして、これを単純にやろうとした際に回答者の方の内容にあるように、dataform独自に拡張されたsqlx構文の影響で簡単にはsqlfluffの恩恵を受けることが現時点ではできません。具体的にはconfigブロックやjs記述で怒られてしまいます。sqlfluffがsqlxの拡張構文に直接対応してくれる未来が一番望ましいですが、これを何とかしようというのが本記事の主旨です。
https://www.googlecloudcommunity.com/gc/Data-Analytics/Dataform-CI-CD/m-p/720576

どうするか

これも元スレに記述があることですが、要するに純粋な.sqlの部分と.sqlxの部分を構文的に分離して解析できれば良いわけです。この分離にはdataform compileコマンドを活用することができます。dataform compileでは--jsonオプションをつけることにより、json形式でコンパイル結果を出力することができます。例えば、以下のようなsqlファイルがあった場合に次のような出力が得られます。

test.sqlx
-- This is an example SQLX file to help you learn the basics of Dataform.
-- Visit https://cloud.google.com/dataform/docs/how-to for more information on how to configure your SQL workflow.

-- You can delete this file, then commit and push your changes to your repository when you are ready.

-- Config blocks allow you to configure, document, and test your data assets.
config {
  type: "table", // Creates a view in BigQuery. Try changing to "table" instead.
  columns: {
    test: "A description for the test column", // Column descriptions are pushed to BigQuery.
  }
}

-- The rest of a SQLX file contains your SELECT statement used to create the table.

SELECT 1 as test

output.json
{
    "tables": [
        {
            "type": "table",
            "target": {
                "schema": "dataform",
                "name": "test",
                "database": "test"
            },
            "query": "-- This is an example SQLX file to help you learn the basics of Dataform.\n-- Visit https://cloud.google.com/dataform/docs/how-to for more information on how to configure your SQL workflow.\n\n-- You can delete this file, then commit and push your changes to your repository when you are ready.\n\n-- Config blocks allow you to configure, document, and test your data assets.\n\n\n-- The rest of a SQLX file contains your SELECT statement used to create the table.\n\nSELECT 1 as test\n",
            "disabled": false,
            "fileName": "definitions/test.sqlx",
            "actionDescriptor": {
                "columns": [
                    {
                        "description": "A description for the test column",
                        "path": [
                            "test"
                        ]
                    }
                ]
            },
            "canonicalTarget": {
                "schema": "dataform",
                "name": "test",
                "database": "test"
            },
            "enumType": "TABLE"
        }
    ],
    "projectConfig": {
        "warehouse": "bigquery",
        "defaultSchema": "dataform",
        "assertionSchema": "dataform_assertions",
        "defaultDatabase": "test",
        "defaultLocation": "asia-northeast1"
    },
    "graphErrors": {},
    "dataformCoreVersion": "3.0.2",
    "targets": [
        {
            "schema": "dataform",
            "name": "test",
            "database": "test"
        }
    ]
}

この出力を見ると各tables要素の中のqueryフィールドにcompileされたsqlの内容が記述されています。そのため、こいつを解析してsqlfluffにかけることができれば目的が達成できそうです。
その処理は以下のようなpythonスクリプトで簡単に実行できます。

main.py
import json
import sys
import subprocess
from pathlib import Path
import os
DEFAULT_TMP_DIR = "___tmp"

def compile_dataform(project_dir:str)->str:
    result = subprocess.run(
        ['dataform', 'compile', '--json', '--project-dir', project_dir],
        check=True,  
        text=True,  
        capture_output=True 
    )
    return result.stdout

def parse_compiled_json(compiled_json_str:str):
    compiled_json = json.loads(compiled_json_str)
    for table in compiled_json["tables"]:
        path = Path(table['fileName'])
        output_directory = DEFAULT_TMP_DIR / path.parent
        os.makedirs(output_directory , exist_ok=True)

        output_path = Path(output_directory) / path.name.replace(".sqlx",".sql")
        
        with open(output_path,"w") as f:
            f.write(table["query"])

def exec_sqlfluff():
    try:
        subprocess.run(
            ['sqlfluff', 'lint',DEFAULT_TMP_DIR],
            check=True,  
        )
        return True
    except  subprocess.CalledProcessError as e:
        return False


project_directory = sys.argv[1]
compiled_json_str = compile_dataform(project_directory)
parse_compiled_json(compiled_json_str)
is_sucess = exec_sqlfluff()

if(not is_sucess):
    sys.exit(-1)

これを上記のtest.sqlxを含むdataformプロジェクトに対して実行すると以下の出力が得られます。

stdout
== [___tmp/definitions/test.sql] FAIL                                                                                                                                                                                    
L:  11 | P:  10 | CP01 | Keywords must be consistently upper case.
                       | [capitalisation.keywords]
All Finished 📜 🎉!

※sqlfluffのpythonAPIもあるのですが、なぜsubprocessで直接実行しているかというと出力を自分で整形するのが面倒だったからです。

最後に

思いつきでやりましたが、簡易にsql部分を分離することでsqlfluffを適用することができました。これをもう少し整理してGitHubのcustom actionで使えるようにすればCI/CDに組み込むことも簡単そうです。本当はそこまでの内容をこの記事内でやりたかったんですが、一旦夏休み最終日で時間切れということでここまでで記事をまとめました。(近日中には何かアップデートして追記したい。する。はず。)

OPTIMINDテックブログ

Discussion